|
| 1 | +import { CloudWatchLogsAction, LambdaFunctionAction } from '@aws-cdk/aws-iot-actions-alpha'; |
| 2 | +import { IotSql, TopicRule } from '@aws-cdk/aws-iot-alpha'; |
| 3 | +import { CfnOutput, RemovalPolicy, Stack, StackProps } from 'aws-cdk-lib'; |
| 4 | +import { Effect, PolicyStatement, ServicePrincipal } from 'aws-cdk-lib/aws-iam'; |
| 5 | +import { Architecture, Code, Function, Runtime } from 'aws-cdk-lib/aws-lambda'; |
| 6 | +import { LogGroup, RetentionDays } from 'aws-cdk-lib/aws-logs'; |
| 7 | +import { AwsCustomResource, AwsCustomResourcePolicy, PhysicalResourceId } from 'aws-cdk-lib/custom-resources'; |
| 8 | +import { Construct } from 'constructs'; |
| 9 | + |
| 10 | +export class LambdaIotCdkStack extends Stack { |
| 11 | + constructor(scope: Construct, id: string, props?: StackProps) { |
| 12 | + super(scope, id, props); |
| 13 | + |
| 14 | + let mqttTopicName = "my/mqtt/topic" |
| 15 | + let mqttTopicRegion = Stack.of(this).region |
| 16 | + let mqttTopicAccount = Stack.of(this).account |
| 17 | + let iotEndpointAddress = this.getIoTEndpoint().getResponseField('endpointAddress') |
| 18 | + |
| 19 | + // Publisher lambda function. |
| 20 | + // Remark - if the Lambda is in the same account and region as the IoT Core endpoint, then setting the endpoint is optional. |
| 21 | + const iotPubPermission = new PolicyStatement(({ |
| 22 | + effect: Effect.ALLOW, |
| 23 | + resources: [ `arn:aws:iot:${mqttTopicRegion}:${mqttTopicAccount}:topic/${mqttTopicName}` ], |
| 24 | + actions: [ "iot:Publish" ] |
| 25 | + })); |
| 26 | + const iotPubLambda = new Function(this, 'iotPubHandler', { |
| 27 | + handler: 'publisher_function.handler', |
| 28 | + code: Code.fromAsset('./src'), |
| 29 | + description: 'This function publishes a message to AWS IoT Core - MTTQ', |
| 30 | + runtime: Runtime.PYTHON_3_12, |
| 31 | + architecture: Architecture.ARM_64, |
| 32 | + logGroup: this.addLogGroup(`/aws/lambda/pub-lambda`), |
| 33 | + environment: { |
| 34 | + MQTT_TOPIC_REGION: mqttTopicRegion, |
| 35 | + MQTT_TOPIC_NAME: mqttTopicName |
| 36 | + } |
| 37 | + }) |
| 38 | + iotPubLambda.addToRolePolicy(iotPubPermission) |
| 39 | + |
| 40 | + // Receiver lambda function |
| 41 | + const iotReceiverPermission = new PolicyStatement(({ |
| 42 | + effect: Effect.ALLOW, |
| 43 | + resources: [ `arn:aws:iot:${mqttTopicRegion}:${mqttTopicAccount}:topic/${mqttTopicName}` ], |
| 44 | + actions: [ |
| 45 | + "iot:Receive" |
| 46 | + ] |
| 47 | + })); |
| 48 | + const iotReceiverLambda = new Function(this, 'iotReceiverHandler', { |
| 49 | + handler: 'receiver_function.handler', |
| 50 | + description: 'This function get invoked by AWS IoT Core through the action-rule', |
| 51 | + code: Code.fromAsset('./src', { |
| 52 | + bundling: { |
| 53 | + image: Runtime.PYTHON_3_12.bundlingImage, |
| 54 | + command: [ |
| 55 | + 'bash', '-c', |
| 56 | + 'pip install -r receiver_requirements.txt -t /asset-output && cp -au . /asset-output' |
| 57 | + ], |
| 58 | + }, |
| 59 | + }), |
| 60 | + runtime: Runtime.PYTHON_3_12, |
| 61 | + architecture: Architecture.ARM_64, |
| 62 | + logGroup: this.addLogGroup(`/aws/lambda/receiver-lambda`) |
| 63 | + }) |
| 64 | + iotReceiverLambda.addToRolePolicy(iotReceiverPermission) |
| 65 | + |
| 66 | + // Topic rule |
| 67 | + const errorLogGroup = new LogGroup(this, 'RuleErrorLogGroup', { |
| 68 | + logGroupName: '/aws/iot/rule-error-logs', |
| 69 | + retention: RetentionDays.FIVE_DAYS, |
| 70 | + removalPolicy: RemovalPolicy.DESTROY |
| 71 | + }) |
| 72 | + let topicRule = new TopicRule(this, 'IoTTopicRule', { |
| 73 | + topicRuleName: 'ProcessIoTMessages', |
| 74 | + description: 'Invokes the lambda function', |
| 75 | + sql: IotSql.fromStringAsVer20160323("SELECT * FROM 'my/mqtt/topic'"), |
| 76 | + actions: [ new LambdaFunctionAction(iotReceiverLambda) ], |
| 77 | + errorAction: new CloudWatchLogsAction(errorLogGroup) |
| 78 | + }) |
| 79 | + |
| 80 | + // Grant permission for AWS IoT to invoke the Lambda function |
| 81 | + const iotServicePrincipal = new ServicePrincipal('iot.amazonaws.com'); |
| 82 | + iotReceiverLambda.grantInvoke(iotServicePrincipal); |
| 83 | + |
| 84 | + // Outputs |
| 85 | + new CfnOutput(this, "IoT Endpoint Address", { |
| 86 | + value: iotEndpointAddress ?? "Error: can't get the IoT Endpoint Address!", |
| 87 | + }); |
| 88 | + } |
| 89 | + |
| 90 | + // Utility function to return a log-group object |
| 91 | + private addLogGroup(logGroupName: string) { |
| 92 | + const retentionDays = RetentionDays.FIVE_DAYS |
| 93 | + const removalPolicy = RemovalPolicy.DESTROY |
| 94 | + const props = { logGroupName, retentionDays, removalPolicy } |
| 95 | + return new LogGroup(this, `${logGroupName}`, props) |
| 96 | + } |
| 97 | + |
| 98 | + // Get the current account IoT-Endpoint |
| 99 | + private getIoTEndpoint() { |
| 100 | + const ioTEndpoint = new AwsCustomResource(this, 'IoTEndpoint', { |
| 101 | + onCreate: { |
| 102 | + service: 'Iot', |
| 103 | + action: 'describeEndpoint', |
| 104 | + physicalResourceId: PhysicalResourceId.fromResponse('endpointAddress'), |
| 105 | + parameters: { |
| 106 | + "endpointType": "iot:Data-ATS" |
| 107 | + } |
| 108 | + }, |
| 109 | + policy: AwsCustomResourcePolicy.fromSdkCalls({resources: AwsCustomResourcePolicy.ANY_RESOURCE}) |
| 110 | + }) |
| 111 | + const IOT_ENDPOINT = ioTEndpoint.getResponseField('endpointAddress') |
| 112 | + return ioTEndpoint |
| 113 | + } |
| 114 | +} |
0 commit comments