diff --git a/typescript/aurora-eventual-s3-data-load/.gitignore b/typescript/aurora-eventual-s3-data-load/.gitignore new file mode 100644 index 0000000000..2bc29141c4 --- /dev/null +++ b/typescript/aurora-eventual-s3-data-load/.gitignore @@ -0,0 +1,11 @@ +*.js +!jest.config.js +*.d.ts +node_modules + +# CDK asset staging directory +.cdk.staging +cdk.out + +.idea/ +package-lock.json diff --git a/typescript/aurora-eventual-s3-data-load/.npmignore b/typescript/aurora-eventual-s3-data-load/.npmignore new file mode 100644 index 0000000000..c1d6d45dcf --- /dev/null +++ b/typescript/aurora-eventual-s3-data-load/.npmignore @@ -0,0 +1,6 @@ +*.ts +!*.d.ts + +# CDK asset staging directory +.cdk.staging +cdk.out diff --git a/typescript/aurora-eventual-s3-data-load/README.md b/typescript/aurora-eventual-s3-data-load/README.md new file mode 100644 index 0000000000..3ae01685bb --- /dev/null +++ b/typescript/aurora-eventual-s3-data-load/README.md @@ -0,0 +1,118 @@ +# demo-aurora-eventual-s3-data-load + +## Project Overview + +This project implements a robust, event-driven architecture that seamlessly ingests `CSV` files uploaded to an _Amazon +S3_ bucket into an _Aurora MySQL_ database. It leverages the powerful `LOAD` command of the _MySQL_ engine to +efficiently load the data into the target database. + +Key Features: + +1. **Event-Driven Architecture**: The system is designed to react to events triggered by `CSV` file uploads to the + designated _S3_ bucket. This ensures real-time data ingestion and minimizes manual intervention. + +2. **Reliable Data Ingestion**: The `LOAD` command is utilized to efficiently and reliably load the `CSV` data into the + _Aurora MySQL_ database, ensuring data integrity and consistency. + +3. **Error Handling and Notifications**: In the event of errors or warnings during the data ingestion process, the + system captures and stores the error details in an _SQS Dead Letter Queue_ (DLQ). This facilitates subsequent + analysis and troubleshooting. Additionally, an alarm is triggered to notify the relevant stakeholders, enabling + prompt resolution of issues. + +4. **Monitoring and Observability**: The system incorporates monitoring capabilities, allowing you to track the + transaction, health, and potential issues in the data ingestion pipeline. + +## Architecture + +The architecture consists of the following components: + +1. An _S3_ bucket that will store the `CSV` file and notify the subsequent components. +2. An _SQS_ queue to store the event notification from the _S3_ bucket. +3. An _SQS_ queue to store any failed attempts of processing the files (_Dead-letter queue_). +4. A _Lambda_ function deployed in the same _VPC_ as the database. This function will consume the message from the _SQS_ + queue and trigger the database `LOAD` process, providing the data file details. +5. An _Aurora MySQL_ database where the data is persisted and the new data will be loaded. The database is responsible + to fetch the data file from the _S3_ bucket through the use of the `LOAD` command. +6. A _CloudWatch Alarm_ that will monitor the metric `NumberOfMessagesReceived` of the _DLQ_. +7. An _SNS_ topic that will be triggered from the alarm created in case of new messages arriving in the _DLQ_. The + stakeholders can subscribe to this topic directly using their e-mail addresses or mobile phone numbers. It's also + possible to create a custom integration that will allow the architecture to notify the related stakeholders using + third-party components. +8. _CloudWatch Logs_ will capture and store all the logs produced by the _Lambda_ functions for further analysis. + +![architecture.png](docs/architecture.png) + +## Deployment Guide + +### Prerequisites + +- AWS CLI installed and configured with appropriate IAM permissions +- NodeJS installed (version 22.4 or later) +- AWS CDK installed (version 2.160 or later) + +### CDK Toolkit + +The `cdk.json` file tells the CDK Toolkit how to execute your app. + +To start working with the project, first you will need to install all dependencies as well as the cdk module (if not +installed already). In the project directory, run: + +```bash +$ npm install +``` + +### Deploying the solution + +To deploy the solution, we will need to request cdk to deploy the stacks: + +```shell +$ cdk deploy --all +``` + +> **Note**: After the deployment is done, some output data is presented on the terminal, providing information regarding +> the deployed solution: +> - **DataBucketName**: S3 bucket where the data files will be uploaded. +> - **DataLoadQueueName**: Queue responsible for storing the events sent from S3. +> - **DLQName**: Dead-letter queue responsible for storing failed events. +> - **NotificationTopicName**: SNS topic responsible for notifying the stakeholders regarding failed processes. +> - **FunctionLogGroupName**: CloudWatch Log Group responsible for storing the Lambda's function logs. + +```shell +Outputs: +DemoAuroraEventualDataLoadStack.BastionHostSecurityGroupId = sg-XXXXXXXXXXX +DemoAuroraEventualDataLoadStack.DLQName = demo-data-load-dlq +DemoAuroraEventualDataLoadStack.DataBucketName = data-bucket-XXXXXXXXXXX +DemoAuroraEventualDataLoadStack.DataLoadQueueName = demo-data-load +DemoAuroraEventualDataLoadStack.DatabaseSecretName = demo-aurora-eventual-load-database-secret +DemoAuroraEventualDataLoadStack.FunctionLogGroupName = /aws/lambda/demo-aurora-eventual-data-load-function +DemoAuroraEventualDataLoadStack.NotificationTopicName = demo-aurora-eventual-load-notification +DemoAuroraEventualDataLoadStack.VpcId = vpc-XXXXXXXXXXX +``` + +## Testing the solution + +1. Head to _AWS_ console and then to _S3_ +2. Select the bucket provided from the deployment command and click on `Upload` +3. Select one of the files present on this repo in the _/data_ directory. + 1. `db-data.csv` will load successfully into the database and will generate enough logs for you to check the number + of rows loaded. + 2. `db-data-with-error.csv` will produce some errors and will deliver the message to the _DLQ_, which will trigger + the alarm and send a message to the alarm topic. This will also generate enough logs for better understand the + errors encountered. +4. You can check the logs produced by the solution using the _CloudWatch Log Group_ provided after the deployment + process. +5. Whenever you want to test the failure scenario, after uploading the data file with errors, you will be able to see + the failure logs on the _CloudWatch Log Group_ and the alarm in "In Alarm" state on _CloudWatch Alarms_ section. +6. **(OPTIONAL)** You can subscribe your e-mail address to the _SNS Notification Topic_ and validate the e-mail sent on + the failure event. +7. **(OPTIONAL)** You can access the database by deploying an _EC2_ bastion host or _CloudShell_ session inside the + created _VPC_ (see the `VpcId` output). You will need to install the _MySQL_ client and use the database credentials + through _Secrets Manager_ (see the `DatabaseSecretName` output). + +## Cleanup + +To destroy the provisioned infrastructure, you can simply run the following command: + +```shell +$ cdk destroy --all +``` diff --git a/typescript/aurora-eventual-s3-data-load/bin/demo-aurora-eventual-data-load.ts b/typescript/aurora-eventual-s3-data-load/bin/demo-aurora-eventual-data-load.ts new file mode 100644 index 0000000000..373d863dd1 --- /dev/null +++ b/typescript/aurora-eventual-s3-data-load/bin/demo-aurora-eventual-data-load.ts @@ -0,0 +1,21 @@ +#!/opt/homebrew/opt/node/bin/node +import 'source-map-support/register'; +import * as cdk from 'aws-cdk-lib'; +import { DemoAuroraEventualDataLoadStack } from '../lib/demo-aurora-eventual-data-load-stack'; + +const app = new cdk.App(); +new DemoAuroraEventualDataLoadStack(app, 'DemoAuroraEventualDataLoadStack', { + /* If you don't specify 'env', this stack will be environment-agnostic. + * Account/Region-dependent features and context lookups will not work, + * but a single synthesized template can be deployed anywhere. */ + + /* Uncomment the next line to specialize this stack for the AWS Account + * and Region that are implied by the current CLI configuration. */ + // env: { account: process.env.CDK_DEFAULT_ACCOUNT, region: process.env.CDK_DEFAULT_REGION }, + + /* Uncomment the next line if you know exactly what Account and Region you + * want to deploy the stack to. */ + // env: { account: '123456789012', region: 'us-east-1' }, + + /* For more information, see https://docs.aws.amazon.com/cdk/latest/guide/environments.html */ +}); \ No newline at end of file diff --git a/typescript/aurora-eventual-s3-data-load/cdk.json b/typescript/aurora-eventual-s3-data-load/cdk.json new file mode 100644 index 0000000000..2234fa7ca2 --- /dev/null +++ b/typescript/aurora-eventual-s3-data-load/cdk.json @@ -0,0 +1,72 @@ +{ + "app": "npx ts-node --prefer-ts-exts bin/demo-aurora-eventual-data-load.ts", + "watch": { + "include": [ + "**" + ], + "exclude": [ + "README.md", + "cdk*.json", + "**/*.d.ts", + "**/*.js", + "tsconfig.json", + "package*.json", + "yarn.lock", + "node_modules", + "test" + ] + }, + "context": { + "@aws-cdk/aws-lambda:recognizeLayerVersion": true, + "@aws-cdk/core:checkSecretUsage": true, + "@aws-cdk/core:target-partitions": [ + "aws", + "aws-cn" + ], + "@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true, + "@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true, + "@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true, + "@aws-cdk/aws-iam:minimizePolicies": true, + "@aws-cdk/core:validateSnapshotRemovalPolicy": true, + "@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true, + "@aws-cdk/aws-s3:createDefaultLoggingPolicy": true, + "@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true, + "@aws-cdk/aws-apigateway:disableCloudWatchRole": true, + "@aws-cdk/core:enablePartitionLiterals": true, + "@aws-cdk/aws-events:eventsTargetQueueSameAccount": true, + "@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true, + "@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true, + "@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true, + "@aws-cdk/aws-route53-patters:useCertificate": true, + "@aws-cdk/customresources:installLatestAwsSdkDefault": false, + "@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true, + "@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true, + "@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true, + "@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true, + "@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true, + "@aws-cdk/aws-redshift:columnId": true, + "@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true, + "@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true, + "@aws-cdk/aws-apigateway:requestValidatorUniqueId": true, + "@aws-cdk/aws-kms:aliasNameRef": true, + "@aws-cdk/aws-autoscaling:generateLaunchTemplateInsteadOfLaunchConfig": true, + "@aws-cdk/core:includePrefixInUniqueNameGeneration": true, + "@aws-cdk/aws-efs:denyAnonymousAccess": true, + "@aws-cdk/aws-opensearchservice:enableOpensearchMultiAzWithStandby": true, + "@aws-cdk/aws-lambda-nodejs:useLatestRuntimeVersion": true, + "@aws-cdk/aws-efs:mountTargetOrderInsensitiveLogicalId": true, + "@aws-cdk/aws-rds:auroraClusterChangeScopeOfInstanceParameterGroupWithEachParameters": true, + "@aws-cdk/aws-appsync:useArnForSourceApiAssociationIdentifier": true, + "@aws-cdk/aws-rds:preventRenderingDeprecatedCredentials": true, + "@aws-cdk/aws-codepipeline-actions:useNewDefaultBranchForCodeCommitSource": true, + "@aws-cdk/aws-cloudwatch-actions:changeLambdaPermissionLogicalIdForLambdaAction": true, + "@aws-cdk/aws-codepipeline:crossAccountKeysDefaultValueToFalse": true, + "@aws-cdk/aws-codepipeline:defaultPipelineTypeToV2": true, + "@aws-cdk/aws-kms:reduceCrossAccountRegionPolicyScope": true, + "@aws-cdk/aws-eks:nodegroupNameAttribute": true, + "@aws-cdk/aws-ec2:ebsDefaultGp3Volume": true, + "@aws-cdk/aws-ecs:removeDefaultDeploymentAlarm": true, + "@aws-cdk/custom-resources:logApiResponseDataPropertyTrueDefault": false, + "@aws-cdk/aws-s3:keepNotificationInImportedBucket": false + } +} diff --git a/typescript/aurora-eventual-s3-data-load/data/db-data-with-error.csv b/typescript/aurora-eventual-s3-data-load/data/db-data-with-error.csv new file mode 100644 index 0000000000..95f90fb597 --- /dev/null +++ b/typescript/aurora-eventual-s3-data-load/data/db-data-with-error.csv @@ -0,0 +1,4 @@ +a1;Teste;aaaa +a2;Teste 2;bbbb +a30;Teste 30;cccc +a33;Teste 33;dddd diff --git a/typescript/aurora-eventual-s3-data-load/data/db-data.csv b/typescript/aurora-eventual-s3-data-load/data/db-data.csv new file mode 100644 index 0000000000..cdb75e2f0f --- /dev/null +++ b/typescript/aurora-eventual-s3-data-load/data/db-data.csv @@ -0,0 +1,4 @@ +1;Teste +2;Teste 2 +30;Teste 30 +33;Teste 33 diff --git a/typescript/aurora-eventual-s3-data-load/docs/architecture.drawio b/typescript/aurora-eventual-s3-data-load/docs/architecture.drawio new file mode 100644 index 0000000000..e4949b6f92 --- /dev/null +++ b/typescript/aurora-eventual-s3-data-load/docs/architecture.drawio @@ -0,0 +1,154 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/typescript/aurora-eventual-s3-data-load/docs/architecture.png b/typescript/aurora-eventual-s3-data-load/docs/architecture.png new file mode 100644 index 0000000000..6aea092d4f Binary files /dev/null and b/typescript/aurora-eventual-s3-data-load/docs/architecture.png differ diff --git a/typescript/aurora-eventual-s3-data-load/jest.config.js b/typescript/aurora-eventual-s3-data-load/jest.config.js new file mode 100644 index 0000000000..08263b8954 --- /dev/null +++ b/typescript/aurora-eventual-s3-data-load/jest.config.js @@ -0,0 +1,8 @@ +module.exports = { + testEnvironment: 'node', + roots: ['/test'], + testMatch: ['**/*.test.ts'], + transform: { + '^.+\\.tsx?$': 'ts-jest' + } +}; diff --git a/typescript/aurora-eventual-s3-data-load/lambda/function/data-load-function.py b/typescript/aurora-eventual-s3-data-load/lambda/function/data-load-function.py new file mode 100644 index 0000000000..8f2cab8f55 --- /dev/null +++ b/typescript/aurora-eventual-s3-data-load/lambda/function/data-load-function.py @@ -0,0 +1,116 @@ +import boto3 +import json +import os +import pymysql +import sys +from aws_lambda_powertools import Logger + +logger = Logger(level="INFO", log_uncaught_exceptions=True) + +# Loading database credentials from Secrets Manager +secrets_manager = boto3.client('secretsmanager') +secret = json.loads(secrets_manager.get_secret_value(SecretId=os.environ['SECRETS_ID'])['SecretString']) + +try: + logger.info("Connecting to the database...") + + # Establishing connection to the database + conn = pymysql.connect(host=str(secret['host']), + user=str(secret['username']), + passwd=str(secret['password']), + db=str(secret['dbname']), + connect_timeout=15) + + # Creating a cursor object + cursor = conn.cursor() + +except pymysql.MySQLError as e: + logger.error({ + "description": "ERROR: Unexpected error: Could not connect to MySQL instance.", + "error": e + }) + sys.exit(1) + +logger.info("SUCCESS: Connection to RDS for MySQL instance succeeded") + + +# Capturing the SQS MessageId attribute and defining it as the log's CorrelationID +@logger.inject_lambda_context(correlation_id_path="Records[0].messageId") +def handler(event, context): + try: + # Retrieving the S3 object path from the SQS message + message = json.loads(event['Records'][0]['body']) + s3 = message['Records'][0]['s3'] + bucket_object = "s3://" + s3['bucket']['name'] + "/" + s3['object']['key'] + "" + + logger.debug("Object to fetch: " + bucket_object) + + # Starting the DB transaction + conn.begin() + + # Creating the table if it doesn't exist + logger.info("Creating the table if it doesn't exist") + + create_table_statement = ( + "CREATE TABLE IF NOT EXISTS test (id int primary key auto_increment not null, description varchar(40) not null)") + logger.debug(create_table_statement) + cursor.execute(create_table_statement) + + logger.info("Table created successfully") + + # Loading data from S3 into the table + logger.info("Loading data from S3 into the table") + + load_statement = ("LOAD DATA " + "FROM S3 '" + bucket_object + "' " + "REPLACE " + "INTO TABLE test " + "FIELDS TERMINATED BY ';' " + "LINES TERMINATED BY '\n';") + logger.debug(load_statement) + cursor.execute(load_statement) + + logger.info("Data loaded successfully") + + affected_rows = conn.affected_rows() + + logger.info("Affected rolls: " + str(affected_rows)) + + warnings = conn.show_warnings() + + logger.info("Warnings: " + str(warnings)) + + if len(warnings) > 0: + # Raising a warning exception in case of any warnings + raise pymysql.Warning(warnings) + + # Committing the transaction + conn.commit() + + logger.info("Load process executed successfully") + + except pymysql.Warning as e: + conn.rollback() + + message = "There were warning alerts during LOAD process. Transaction rollback" + logger.error({ + "description": message, + "warnings": e + }) + + # Re-raise the exception to propagate it further + raise Exception(message) + + except pymysql.MySQLError as e: + conn.rollback() + + message = "There were errors during LOAD process. Transaction rollback" + logger.error({ + "description": message, + "error": e + }) + + # Re-raise the exception to propagate it further + raise Exception(message) + + return "Load process executed successfully" diff --git a/typescript/aurora-eventual-s3-data-load/lambda/layer/pymysql_lambda_layer.zip b/typescript/aurora-eventual-s3-data-load/lambda/layer/pymysql_lambda_layer.zip new file mode 100644 index 0000000000..43c5e4082f Binary files /dev/null and b/typescript/aurora-eventual-s3-data-load/lambda/layer/pymysql_lambda_layer.zip differ diff --git a/typescript/aurora-eventual-s3-data-load/lib/demo-aurora-eventual-data-load-stack.ts b/typescript/aurora-eventual-s3-data-load/lib/demo-aurora-eventual-data-load-stack.ts new file mode 100644 index 0000000000..55004ffa00 --- /dev/null +++ b/typescript/aurora-eventual-s3-data-load/lib/demo-aurora-eventual-data-load-stack.ts @@ -0,0 +1,285 @@ +import * as cdk from 'aws-cdk-lib'; +import {Duration, RemovalPolicy} from 'aws-cdk-lib'; +import {Construct} from 'constructs'; +import { + GatewayVpcEndpointAwsService, + InstanceClass, + InstanceSize, + InstanceType, + InterfaceVpcEndpointAwsService, + Peer, + Port, + SecurityGroup, + SubnetType, + Vpc +} from "aws-cdk-lib/aws-ec2"; +import {Bucket, EventType} from "aws-cdk-lib/aws-s3"; +import {Architecture, Code, Function, LayerVersion, LoggingFormat, Runtime} from "aws-cdk-lib/aws-lambda"; +import {RetentionDays} from "aws-cdk-lib/aws-logs"; +import {Queue} from "aws-cdk-lib/aws-sqs"; +import {Alarm, ComparisonOperator, TreatMissingData} from "aws-cdk-lib/aws-cloudwatch"; +import {SnsAction} from "aws-cdk-lib/aws-cloudwatch-actions"; +import {Topic} from "aws-cdk-lib/aws-sns"; +import {SqsDestination} from "aws-cdk-lib/aws-s3-notifications"; +import {AwsCustomResource, AwsCustomResourcePolicy, PhysicalResourceId} from "aws-cdk-lib/custom-resources"; +import { + AuroraMysqlEngineVersion, + ClusterInstance, + Credentials, + DatabaseCluster, + DatabaseClusterEngine, + DBClusterStorageType +} from "aws-cdk-lib/aws-rds"; + +export class DemoAuroraEventualDataLoadStack extends cdk.Stack { + constructor(scope: Construct, id: string, props?: cdk.StackProps) { + super(scope, id, props); + + // Create the S3 Bucket to accommodate the data files using during the LOAD process. + const dataBucket = new Bucket(this, 'DataBucket', { + bucketName: `data-bucket-${this.account}`, + removalPolicy: RemovalPolicy.DESTROY, + autoDeleteObjects: true, + enforceSSL: true + }) + + + // Create the VPC where the workload will be allocated. + const vpc = new Vpc(this, 'DemoVpc', { + vpcName: 'demo-aurora-eventual-load-vpc', + maxAzs: 2, + enableDnsHostnames: true, + enableDnsSupport: true, + subnetConfiguration: [ + { + cidrMask: 24, + name: 'public-subnet', + subnetType: SubnetType.PUBLIC + }, + { + cidrMask: 24, + name: 'private-data-subnet', + subnetType: SubnetType.PRIVATE_ISOLATED + } + ], + gatewayEndpoints: { + 'S3Endpoint': { + service: GatewayVpcEndpointAwsService.S3, + subnets: [ + {subnetType: SubnetType.PRIVATE_ISOLATED} + ] + } + }, + }) + vpc.addInterfaceEndpoint('SecretsManagerEndpoint', { + service: InterfaceVpcEndpointAwsService.SECRETS_MANAGER, + subnets: { + subnetType: SubnetType.PRIVATE_ISOLATED + } + }) + + + // Create Security Groups that will be associated to the DB instance and lambda function. + const lambdaSecurityGroup = new SecurityGroup(this, 'LambdaSecurityGroup', { + vpc: vpc, + securityGroupName: 'demo-lambda-sg', + allowAllOutbound: true, + }) + + const databaseSecurityGroup = new SecurityGroup(this, 'DatabaseSecurityGroup', { + vpc: vpc, + securityGroupName: 'demo-database-sg', + allowAllOutbound: true, + }) + databaseSecurityGroup.addIngressRule(Peer.securityGroupId(lambdaSecurityGroup.securityGroupId), Port.tcp(3306)) + + // If you need a bastion host to explore the DB instance, add this security group. + const bastionSecurityGroup = new SecurityGroup(this, 'BastionSecurityGroup', { + vpc: vpc, + securityGroupName: 'demo-bastion-sg', + allowAllOutbound: true, + }) + bastionSecurityGroup.addIngressRule(Peer.anyIpv4(), Port.tcp(22)) + databaseSecurityGroup.addIngressRule(Peer.securityGroupId(bastionSecurityGroup.securityGroupId), Port.tcp(3306)) + + + // Create the Aurora MySQL Database. + const databaseCluster = new DatabaseCluster(this, 'DemoAuroraDatabase', { + clusterIdentifier: 'demo-aurora-eventual-load', + defaultDatabaseName: 'demo', + engine: DatabaseClusterEngine.auroraMysql({version: AuroraMysqlEngineVersion.VER_3_07_1}), + credentials: Credentials.fromGeneratedSecret('admin', { + secretName: 'demo-aurora-eventual-load-database-secret' + }), + writer: ClusterInstance.provisioned('DemoAuroraDatabase', { + instanceType: InstanceType.of(InstanceClass.BURSTABLE4_GRAVITON, InstanceSize.MEDIUM), + instanceIdentifier: 'demo-aurora-eventual-load-writer' + }), + vpc: vpc, + vpcSubnets: vpc.selectSubnets({subnetType: SubnetType.PRIVATE_ISOLATED}), + securityGroups: [ + databaseSecurityGroup + ], + deletionProtection: false, + removalPolicy: RemovalPolicy.DESTROY, + s3ImportBuckets: [dataBucket], + storageType: DBClusterStorageType.AURORA + }); + + + // Create the SQS queue and DLQ to receive the S3 notifications. + const dataLoadDlq = new Queue(this, 'DataLoadDlq', { + queueName: 'demo-data-load-dlq', + retentionPeriod: Duration.days(2), + visibilityTimeout: Duration.seconds(15), + removalPolicy: RemovalPolicy.DESTROY, + }); + + const dataLoadQueue = new Queue(this, 'DataLoadQueue', { + queueName: 'demo-data-load', + retentionPeriod: Duration.days(2), + visibilityTimeout: Duration.seconds(30), + removalPolicy: RemovalPolicy.DESTROY, + deadLetterQueue: { + queue: dataLoadDlq, + maxReceiveCount: 2 + } + }); + dataBucket.addEventNotification(EventType.OBJECT_CREATED, new SqsDestination(dataLoadQueue), { + suffix: 'csv' + }); + + + // The bellow resource is meant to deploy a SQS PurgeQueue command to erase all the existing messages in the queue. + // This is done to prevent the lambda function to consume the S3 TestEvent send when you create the S3 to SQS + // notification. https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html#supported-notification-event-types + const purgeS3TestEvent = new AwsCustomResource(this, 'PurgeS3TestEvent', { + onCreate: { + service: 'SQS', + action: 'purgeQueue', + parameters: { + QueueUrl: dataLoadQueue.queueUrl, + }, + physicalResourceId: PhysicalResourceId.of('PurgeS3TestEvent') + }, + policy: AwsCustomResourcePolicy.fromSdkCalls({ + resources: AwsCustomResourcePolicy.ANY_RESOURCE, + }) + }); + purgeS3TestEvent.node.addDependency(dataLoadQueue, dataBucket) + + + // Create the lambda function to trigger the LOAD command in the database. + // As this function requires PyMysql library, we create a lambda layer to accommodate it. In addition, we are also + // associating the AWSLambdaPowertoolsPythonV2 layer to use out-of-the-box best practices solutions inside the function. + const pymysqlLambdaLayer = new LayerVersion(this, 'PyMysqlLayer', { + layerVersionName: 'pymysql-python-layer', + code: Code.fromAsset('./lambda/layer/pymysql_lambda_layer.zip'), + compatibleArchitectures: [ + Architecture.ARM_64 + ], + compatibleRuntimes: [ + Runtime.PYTHON_3_12 + ] + }) + + const loadFunction = new Function(this, 'LoadDataFunction', { + functionName: 'demo-aurora-eventual-data-load-function', + code: Code.fromAsset('./lambda/function'), + handler: "data-load-function.handler", + runtime: Runtime.PYTHON_3_12, + architecture: Architecture.ARM_64, + environment: { + 'SECRETS_ID': databaseCluster.secret!.secretName, + 'DATABASE_URL': databaseCluster.clusterReadEndpoint.hostname, + 'DATABASE_PORT': databaseCluster.clusterReadEndpoint.port.toString() + }, + events: [], + layers: [ + pymysqlLambdaLayer, + LayerVersion.fromLayerVersionArn(this, 'AWSLambdaPowertoolsPythonV2', 'arn:aws:lambda:us-east-1:017000801446:layer:AWSLambdaPowertoolsPythonV2-Arm64:78') + ], + logRetention: RetentionDays.ONE_DAY, + loggingFormat: LoggingFormat.JSON, + memorySize: 512, + securityGroups: [ + lambdaSecurityGroup + ], + timeout: Duration.seconds(15), + vpc: vpc, + vpcSubnets: vpc.selectSubnets({subnetType: SubnetType.PRIVATE_ISOLATED}) + }) + databaseCluster.secret!.grantRead(loadFunction) + dataLoadQueue.grantConsumeMessages(loadFunction); + loadFunction.node.addDependency(purgeS3TestEvent) + loadFunction.addEventSourceMapping('SqsNotification', { + enabled: true, + batchSize: 1, + eventSourceArn: dataLoadQueue.queueArn, + }); + + + // Create an SNS topic to notify the users whenever an error occurred during the LOAD process. + const deadLetterNotification = new Topic(this, 'DeadLetterNotification', { + topicName: 'demo-aurora-eventual-load-notification' + }); + + + // Create a CloudWatch Alarm to monitor new messages in the DLQ, indicating errors during the LOAD process. The + // alarm will send a notification through SNS whenever new messages appear in the DLQ. + const deadLetterQueueAlarm = new Alarm(this, 'DeadLetterAlarm', { + alarmName: 'demo-aurora-eventual-load-dlq-alarm', + alarmDescription: 'Alarm triggered during the LOAD process', + actionsEnabled: true, + metric: dataLoadDlq.metricNumberOfMessagesReceived(), + evaluationPeriods: 1, + datapointsToAlarm: 1, + threshold: 0, + comparisonOperator: ComparisonOperator.GREATER_THAN_THRESHOLD, + treatMissingData: TreatMissingData.MISSING, + }); + deadLetterQueueAlarm.addAlarmAction(new SnsAction(deadLetterNotification)); + + + // Outputs + new cdk.CfnOutput(this, 'DataBucketName', { + value: dataBucket.bucketName, + description: 'Data Bucket Name' + }); + + new cdk.CfnOutput(this, 'DataLoadQueueName', { + value: dataLoadQueue.queueName, + description: 'Data Load Queue Name' + }); + + new cdk.CfnOutput(this, 'DLQName', { + value: dataLoadDlq.queueName, + description: 'Dead-letter Queue Name' + }); + + new cdk.CfnOutput(this, 'NotificationTopicName', { + value: deadLetterNotification.topicName, + description: 'Notification Topic Name' + }); + + new cdk.CfnOutput(this, 'FunctionLogGroupName', { + value: loadFunction.logGroup.logGroupName, + description: 'Function Log Group Name' + }); + + new cdk.CfnOutput(this, 'BastionHostSecurityGroupId', { + value: bastionSecurityGroup.securityGroupId, + description: 'Bastion Host Security Group ID' + }); + + new cdk.CfnOutput(this, 'VpcId', { + value: vpc.vpcId, + description: 'VPC ID' + }); + + new cdk.CfnOutput(this, 'DatabaseSecretName', { + value: databaseCluster.secret!.secretName, + description: 'Database Secret Name' + }); + } +} diff --git a/typescript/aurora-eventual-s3-data-load/package.json b/typescript/aurora-eventual-s3-data-load/package.json new file mode 100644 index 0000000000..5742753bf2 --- /dev/null +++ b/typescript/aurora-eventual-s3-data-load/package.json @@ -0,0 +1,27 @@ +{ + "name": "demo-aurora-eventual-data-load", + "version": "0.1.0", + "bin": { + "demo-aurora-eventual-data-load": "bin/demo-aurora-eventual-data-load.js" + }, + "scripts": { + "build": "tsc", + "watch": "tsc -w", + "test": "jest", + "cdk": "cdk" + }, + "devDependencies": { + "@types/jest": "^29.5.12", + "@types/node": "22.5.4", + "jest": "^29.7.0", + "ts-jest": "^29.2.5", + "aws-cdk": "2.158.0", + "ts-node": "^10.9.2", + "typescript": "~5.6.2" + }, + "dependencies": { + "aws-cdk-lib": "2.158.0", + "constructs": "^10.0.0", + "source-map-support": "^0.5.21" + } +} \ No newline at end of file diff --git a/typescript/aurora-eventual-s3-data-load/test/demo-aurora-eventual-data-load.test.ts b/typescript/aurora-eventual-s3-data-load/test/demo-aurora-eventual-data-load.test.ts new file mode 100644 index 0000000000..844364d1f9 --- /dev/null +++ b/typescript/aurora-eventual-s3-data-load/test/demo-aurora-eventual-data-load.test.ts @@ -0,0 +1,132 @@ +import * as cdk from 'aws-cdk-lib'; +import {Match, Template} from 'aws-cdk-lib/assertions'; +import {DemoAuroraEventualDataLoadStack} from '../lib/demo-aurora-eventual-data-load-stack'; + +describe('DemoAuroraEventualDataLoadStack', () => { + let app: cdk.App; + let stack: DemoAuroraEventualDataLoadStack; + let template: Template; + + beforeEach(() => { + app = new cdk.App(); + stack = new DemoAuroraEventualDataLoadStack(app, 'TestStack'); + template = Template.fromStack(stack); + }); + + test('S3 bucket is created with correct configuration', () => { + template.hasResourceProperties('AWS::S3::Bucket', { + BucketName: { + 'Fn::Join': [ + '', + [ + 'data-bucket-', + {Ref: 'AWS::AccountId'} + ] + ] + } + }); + }); + + test('VPC is created with correct configuration', () => { + template.hasResourceProperties('AWS::EC2::VPC', { + EnableDnsHostnames: true, + EnableDnsSupport: true, + Tags: [ + { + Key: 'Name', + Value: 'demo-aurora-eventual-load-vpc' + } + ] + }); + + // Verify subnet configuration + template.resourceCountIs('AWS::EC2::Subnet', 4); // 2 public + 2 private subnets + }); + + test('Security groups are created with correct rules', () => { + // Database security group + template.hasResourceProperties('AWS::EC2::SecurityGroup', { + GroupName: 'demo-database-sg', + SecurityGroupIngress: Match.arrayWith([ + Match.objectLike({ + FromPort: 3306, + IpProtocol: 'tcp', + ToPort: 3306 + }) + ]) + }); + + // Lambda security group + template.hasResourceProperties('AWS::EC2::SecurityGroup', { + GroupName: 'demo-lambda-sg', + SecurityGroupEgress: [ + { + CidrIp: '0.0.0.0/0', + Description: 'Allow all outbound traffic by default', + IpProtocol: '-1' + } + ] + }); + }); + + test('Aurora cluster is created with correct configuration', () => { + template.hasResourceProperties('AWS::RDS::DBCluster', { + Engine: 'aurora-mysql', + DatabaseName: 'demo', + DBClusterIdentifier: 'demo-aurora-eventual-load', + DeletionProtection: false, + }); + }); + + test('SQS queues are created with correct configuration', () => { + // Main queue + template.hasResourceProperties('AWS::SQS::Queue', { + QueueName: 'demo-data-load', + MessageRetentionPeriod: 172800, // 2 days + VisibilityTimeout: 30 + }); + + // DLQ + template.hasResourceProperties('AWS::SQS::Queue', { + QueueName: 'demo-data-load-dlq', + MessageRetentionPeriod: 172800, + VisibilityTimeout: 15 + }); + }); + + test('Lambda function is created with correct configuration', () => { + template.hasResourceProperties('AWS::Lambda::Function', { + FunctionName: 'demo-aurora-eventual-data-load-function', + Handler: 'data-load-function.handler', + Runtime: 'python3.12', + MemorySize: 512, + Timeout: 15, + Architectures: ['arm64'], + VpcConfig: Match.anyValue(), + Layers: Match.arrayWith([ + Match.objectLike({ + Ref: Match.stringLikeRegexp('^PyMysqlLayer*') + }), + 'arn:aws:lambda:us-east-1:017000801446:layer:AWSLambdaPowertoolsPythonV2-Arm64:78' + ]) + }); + }); + + test('CloudWatch alarm is created with correct configuration', () => { + template.hasResourceProperties('AWS::CloudWatch::Alarm', { + AlarmName: 'demo-aurora-eventual-load-dlq-alarm', + AlarmDescription: 'Alarm triggered during the LOAD process', + EvaluationPeriods: 1, + DatapointsToAlarm: 1, + Threshold: 0, + ComparisonOperator: 'GreaterThanThreshold' + }); + }); + + test('SNS topic is created', () => { + template.hasResourceProperties('AWS::SNS::Topic', { + TopicName: 'demo-aurora-eventual-load-notification' + }); + }); + +}); diff --git a/typescript/aurora-eventual-s3-data-load/tsconfig.json b/typescript/aurora-eventual-s3-data-load/tsconfig.json new file mode 100644 index 0000000000..aaa7dc510f --- /dev/null +++ b/typescript/aurora-eventual-s3-data-load/tsconfig.json @@ -0,0 +1,31 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "commonjs", + "lib": [ + "es2020", + "dom" + ], + "declaration": true, + "strict": true, + "noImplicitAny": true, + "strictNullChecks": true, + "noImplicitThis": true, + "alwaysStrict": true, + "noUnusedLocals": false, + "noUnusedParameters": false, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": false, + "inlineSourceMap": true, + "inlineSources": true, + "experimentalDecorators": true, + "strictPropertyInitialization": false, + "typeRoots": [ + "./node_modules/@types" + ] + }, + "exclude": [ + "node_modules", + "cdk.out" + ] +}