diff --git a/integration/combination/test_function_with_msk.py b/integration/combination/test_function_with_msk.py index 472a128f4e..9adb00c6d4 100644 --- a/integration/combination/test_function_with_msk.py +++ b/integration/combination/test_function_with_msk.py @@ -41,14 +41,20 @@ def test_function_with_msk_trigger_and_s3_onfailure_events_destinations(self): "combination/function_with_msk_trigger_and_s3_onfailure_events_destinations", parameters ) - def test_function_with_msk_trigger_and_confluent_schema_registry(self): + def test_function_with_msk_trigger_and_premium_features(self): companion_stack_outputs = self.companion_stack_outputs parameters = self.get_parameters(companion_stack_outputs) cluster_name = "MskCluster4-" + generate_suffix() parameters.append(self.generate_parameter("MskClusterName4", cluster_name)) - self._common_validations_for_MSK( + self._common_validations_for_MSK("combination/function_with_msk_trigger_and_premium_features", parameters) + event_source_mapping_result = self._common_validations_for_MSK( "combination/function_with_msk_trigger_and_confluent_schema_registry", parameters ) + # Verify error handling properties are correctly set + self.assertTrue(event_source_mapping_result.get("BisectBatchOnFunctionError")) + self.assertEqual(event_source_mapping_result.get("MaximumRecordAgeInSeconds"), 3600) + self.assertEqual(event_source_mapping_result.get("MaximumRetryAttempts"), 3) + self.assertEqual(event_source_mapping_result.get("FunctionResponseTypes"), ["ReportBatchItemFailures"]) def _common_validations_for_MSK(self, file_name, parameters): self.create_and_verify_stack(file_name, parameters) @@ -74,6 +80,7 @@ def _common_validations_for_MSK(self, file_name, parameters): self.assertEqual(event_source_mapping_function_arn, lambda_function_arn) self.assertEqual(event_source_mapping_kafka_cluster_arn, msk_cluster_arn) + return event_source_mapping_result def get_parameters(self, dictionary): parameters = [] diff --git a/integration/combination/test_function_with_self_managed_kafka.py b/integration/combination/test_function_with_self_managed_kafka.py index ef97d3d25f..33249f5fdc 100644 --- a/integration/combination/test_function_with_self_managed_kafka.py +++ b/integration/combination/test_function_with_self_managed_kafka.py @@ -16,7 +16,6 @@ class TestFunctionWithSelfManagedKafka(BaseTest): @pytest.mark.flaky(reruns=5) @parameterized.expand( [ - "combination/function_with_self_managed_kafka", "combination/function_with_self_managed_kafka_intrinsics", ] ) @@ -30,3 +29,29 @@ def test_function_with_self_managed_kafka(self, file_name): event_source_mapping_result = lambda_client.get_event_source_mapping(UUID=event_source_mapping_id) event_source_mapping_function_arn = event_source_mapping_result["FunctionArn"] self.assertEqual(event_source_mapping_function_arn, lambda_function_arn) + + @parameterized.expand(["combination/function_with_self_managed_kafka"]) + def test_function_with_self_managed_kafka_with_provisioned_mode(self, file_name): + self.create_and_verify_stack(file_name) + # Get the notification configuration and make sure Lambda Function connection is added + lambda_client = self.client_provider.lambda_client + function_name = self.get_physical_id_by_type("AWS::Lambda::Function") + lambda_function_arn = lambda_client.get_function_configuration(FunctionName=function_name)["FunctionArn"] + event_source_mapping_id = self.get_physical_id_by_type("AWS::Lambda::EventSourceMapping") + event_source_mapping_result = lambda_client.get_event_source_mapping(UUID=event_source_mapping_id) + event_source_mapping_function_arn = event_source_mapping_result["FunctionArn"] + self.assertEqual(event_source_mapping_function_arn, lambda_function_arn) + + # Verify error handling properties are correctly set + self.assertTrue(event_source_mapping_result.get("BisectBatchOnFunctionError")) + self.assertEqual(event_source_mapping_result.get("MaximumRecordAgeInSeconds"), 3600) + self.assertEqual(event_source_mapping_result.get("MaximumRetryAttempts"), 3) + self.assertEqual(event_source_mapping_result.get("FunctionResponseTypes"), ["ReportBatchItemFailures"]) + # Uncomment this once SDK is updated. + # provisioned_poller_config = event_source_mapping_result["ProvisionedPollerConfig"] + # actual_poller_group_name = provisioned_poller_config["PollerGroupName"] + # self.assertEqual( + # actual_poller_group_name, + # "test1", + # f"Expected PollerGroupName to be 'test1' but got '{actual_poller_group_name}'", + # ) diff --git a/integration/resources/expected/combination/function_with_msk_trigger_and_confluent_schema_registry.json b/integration/resources/expected/combination/function_with_msk_trigger_and_premium_features.json similarity index 100% rename from integration/resources/expected/combination/function_with_msk_trigger_and_confluent_schema_registry.json rename to integration/resources/expected/combination/function_with_msk_trigger_and_premium_features.json diff --git a/integration/resources/templates/combination/function_with_msk_trigger_and_confluent_schema_registry.yaml b/integration/resources/templates/combination/function_with_msk_trigger_and_premium_features.yaml similarity index 88% rename from integration/resources/templates/combination/function_with_msk_trigger_and_confluent_schema_registry.yaml rename to integration/resources/templates/combination/function_with_msk_trigger_and_premium_features.yaml index 44fad6ec90..90de1dbd90 100644 --- a/integration/resources/templates/combination/function_with_msk_trigger_and_confluent_schema_registry.yaml +++ b/integration/resources/templates/combination/function_with_msk_trigger_and_premium_features.yaml @@ -66,8 +66,17 @@ Resources: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic + DestinationConfig: + OnFailure: + Type: Kafka + Destination: kafka://testTopic ProvisionedPollerConfig: MinimumPollers: 1 + BisectBatchOnFunctionError: true + MaximumRecordAgeInSeconds: 3600 + MaximumRetryAttempts: 3 + FunctionResponseTypes: + - ReportBatchItemFailures SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH diff --git a/integration/resources/templates/combination/function_with_self_managed_kafka.yaml b/integration/resources/templates/combination/function_with_self_managed_kafka.yaml index 8bf399c49b..0695ba3e0a 100644 --- a/integration/resources/templates/combination/function_with_self_managed_kafka.yaml +++ b/integration/resources/templates/combination/function_with_self_managed_kafka.yaml @@ -15,10 +15,18 @@ Resources: - 123.45.67.89:9096 Topics: - Topic1 + ProvisionedPollerConfig: + MinimumPollers: 1 + PollerGroupName: test1 SourceAccessConfigurations: - Type: BASIC_AUTH URI: Ref: KafkaUserSecret + BisectBatchOnFunctionError: true + MaximumRecordAgeInSeconds: 3600 + MaximumRetryAttempts: 3 + FunctionResponseTypes: + - ReportBatchItemFailures KafkaUserSecret: Type: AWS::SecretsManager::Secret diff --git a/samtranslator/__init__.py b/samtranslator/__init__.py index a0a65a3fb1..1e7114c2e0 100644 --- a/samtranslator/__init__.py +++ b/samtranslator/__init__.py @@ -1 +1 @@ -__version__ = "1.102.0" +__version__ = "1.103.0" diff --git a/samtranslator/internal/schema_source/aws_serverless_function.py b/samtranslator/internal/schema_source/aws_serverless_function.py index cca8559bcf..e40d193b4a 100644 --- a/samtranslator/internal/schema_source/aws_serverless_function.py +++ b/samtranslator/internal/schema_source/aws_serverless_function.py @@ -423,6 +423,10 @@ class MSKEventProperties(BaseModel): DestinationConfig: Optional[PassThroughProp] # TODO: add documentation ProvisionedPollerConfig: Optional[PassThroughProp] SchemaRegistryConfig: Optional[PassThroughProp] + BisectBatchOnFunctionError: Optional[PassThroughProp] = mskeventproperties("BisectBatchOnFunctionError") + FunctionResponseTypes: Optional[PassThroughProp] = mskeventproperties("FunctionResponseTypes") + MaximumRecordAgeInSeconds: Optional[PassThroughProp] = mskeventproperties("MaximumRecordAgeInSeconds") + MaximumRetryAttempts: Optional[PassThroughProp] = mskeventproperties("MaximumRetryAttempts") class MSKEvent(BaseModel): @@ -463,6 +467,12 @@ class SelfManagedKafkaEventProperties(BaseModel): Topics: PassThroughProp = selfmanagedkafkaeventproperties("Topics") ProvisionedPollerConfig: Optional[PassThroughProp] SchemaRegistryConfig: Optional[PassThroughProp] + BisectBatchOnFunctionError: Optional[PassThroughProp] = selfmanagedkafkaeventproperties( + "BisectBatchOnFunctionError" + ) + MaximumRecordAgeInSeconds: Optional[PassThroughProp] = selfmanagedkafkaeventproperties("MaximumRecordAgeInSeconds") + MaximumRetryAttempts: Optional[PassThroughProp] = selfmanagedkafkaeventproperties("MaximumRetryAttempts") + FunctionResponseTypes: Optional[PassThroughProp] = selfmanagedkafkaeventproperties("FunctionResponseTypes") class SelfManagedKafkaEvent(BaseModel): diff --git a/samtranslator/internal/schema_source/sam-docs.json b/samtranslator/internal/schema_source/sam-docs.json index 9b5db688a6..464620c1af 100644 --- a/samtranslator/internal/schema_source/sam-docs.json +++ b/samtranslator/internal/schema_source/sam-docs.json @@ -298,7 +298,11 @@ "StartingPosition": "The position in a stream from which to start reading\\. \n+ `AT_TIMESTAMP` \u2013 Specify a time from which to start reading records\\.\n+ `LATEST` \u2013 Read only new records\\.\n+ `TRIM_HORIZON` \u2013 Process all available records\\.\n*Valid values*: `AT_TIMESTAMP` \\| `LATEST` \\| `TRIM_HORIZON` \n*Type*: String \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPosition`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingposition) property of an `AWS::Lambda::EventSourceMapping` resource\\.", "StartingPositionTimestamp": "The time from which to start reading, in Unix time seconds\\. Define `StartingPositionTimestamp` when `StartingPosition` is specified as `AT_TIMESTAMP`\\. \n*Type*: Double \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPositionTimestamp`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingpositiontimestamp) property of an `AWS::Lambda::EventSourceMapping` resource\\.", "Stream": "The Amazon Resource Name \\(ARN\\) of the data stream or a stream consumer\\. \n*Type*: String \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`EventSourceArn`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-eventsourcearn) property of an `AWS::Lambda::EventSourceMapping` resource\\.", - "Topics": "The name of the Kafka topic\\. \n*Type*: List \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`Topics`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics) property of an `AWS::Lambda::EventSourceMapping` resource\\." + "Topics": "The name of the Kafka topic\\. \n*Type*: List \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`Topics`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "BisectBatchOnFunctionError": "If the function returns an error, split the batch in two and retry\\. \n*Type*: Boolean \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`BisectBatchOnFunctionError`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-bisectbatchonfunctionerror) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "FunctionResponseTypes": "A list of the response types currently applied to the event source mapping\\. For more information, see [Reporting batch item failures](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting) in the *AWS Lambda Developer Guide*\\. \n*Valid values*: `ReportBatchItemFailures` \n*Type*: List \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FunctionResponseTypes`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-functionresponsetypes) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "MaximumRecordAgeInSeconds": "The maximum age of a record that Lambda sends to a function for processing\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRecordAgeInSeconds`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "MaximumRetryAttempts": "The maximum number of times to retry when the function returns an error\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRetryAttempts`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumretryattempts) property of an `AWS::Lambda::EventSourceMapping` resource\\." }, "sam-property-function-onfailure": { "Destination": "The Amazon Resource Name \\(ARN\\) of the destination resource\\. \n*Type*: String \n*Required*: Conditional \n*AWS CloudFormation compatibility*: This property is similar to the [`OnFailure`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventinvokeconfig-destinationconfig-onfailure.html#cfn-lambda-eventinvokeconfig-destinationconfig-onfailure-destination) property of an `AWS::Lambda::EventInvokeConfig` resource\\. SAM will add any necessary permissions to the auto\\-generated IAM Role associated with this function to access the resource referenced in this property\\. \n*Additional notes*: If the type is Lambda/EventBridge, Destination is required\\.", @@ -379,7 +383,11 @@ "SourceAccessConfigurations": "An array of the authentication protocol, VPC components, or virtual host to secure and define your event source\\. \n*Valid values*: `BASIC_AUTH | CLIENT_CERTIFICATE_TLS_AUTH | SASL_SCRAM_256_AUTH | SASL_SCRAM_512_AUTH | SERVER_ROOT_CA_CERTIFICATE` \n*Type*: List of [SourceAccessConfiguration](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration.html) \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the `[ SourceAccessConfigurations](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-sourceaccessconfigurations)` property of an `AWS::Lambda::EventSourceMapping` resource\\.", "StartingPosition": "The position in a stream from which to start reading\\. \n+ `AT_TIMESTAMP` \u2013 Specify a time from which to start reading records\\.\n+ `LATEST` \u2013 Read only new records\\.\n+ `TRIM_HORIZON` \u2013 Process all available records\\.\n*Valid values*: `AT_TIMESTAMP` \\| `LATEST` \\| `TRIM_HORIZON` \n*Type*: String \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPosition`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingposition) property of an `AWS::Lambda::EventSourceMapping` resource\\.", "StartingPositionTimestamp": "The time from which to start reading, in Unix time seconds\\. Define `StartingPositionTimestamp` when `StartingPosition` is specified as `AT_TIMESTAMP`\\. \n*Type*: Double \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPositionTimestamp`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingpositiontimestamp) property of an `AWS::Lambda::EventSourceMapping` resource\\.", - "Topics": "The name of the Kafka topic\\. \n*Type*: List \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`Topics`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics) property of an `AWS::Lambda::EventSourceMapping` resource\\." + "Topics": "The name of the Kafka topic\\. \n*Type*: List \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`Topics`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "BisectBatchOnFunctionError": "If the function returns an error, split the batch in two and retry\\. \n*Type*: Boolean \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`BisectBatchOnFunctionError`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-bisectbatchonfunctionerror) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "FunctionResponseTypes": "A list of the response types currently applied to the event source mapping\\. For more information, see [Reporting batch item failures](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting) in the *AWS Lambda Developer Guide*\\. \n*Valid values*: `ReportBatchItemFailures` \n*Type*: List \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FunctionResponseTypes`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-functionresponsetypes) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "MaximumRecordAgeInSeconds": "The maximum age of a record that Lambda sends to a function for processing\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRecordAgeInSeconds`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "MaximumRetryAttempts": "The maximum number of times to retry when the function returns an error\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRetryAttempts`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumretryattempts) property of an `AWS::Lambda::EventSourceMapping` resource\\." }, "sam-property-function-sns": { "FilterPolicy": "The filter policy JSON assigned to the subscription\\. For more information, see [GetSubscriptionAttributes](https://docs.aws.amazon.com/sns/latest/api/API_GetSubscriptionAttributes.html) in the Amazon Simple Notification Service API Reference\\. \n*Type*: [SnsFilterPolicy](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sns-subscription.html#cfn-sns-subscription-filterpolicy) \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FilterPolicy`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sns-subscription.html#cfn-sns-subscription-filterpolicy) property of an `AWS::SNS::Subscription` resource\\.", diff --git a/samtranslator/model/eventsources/pull.py b/samtranslator/model/eventsources/pull.py index 5845865b72..6e3a7d7782 100644 --- a/samtranslator/model/eventsources/pull.py +++ b/samtranslator/model/eventsources/pull.py @@ -205,10 +205,10 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P if destination_type: # delete this field as its used internally for SAM to determine the policy del on_failure["Type"] - # the values 'SQS', 'SNS', and 'S3' are allowed. No intrinsics are allowed - if destination_type not in ["SQS", "SNS", "S3"]: + # the values 'SQS', 'SNS', 'S3', and 'Kafka' are allowed. No intrinsics are allowed + if destination_type not in ["SQS", "SNS", "S3", "Kafka"]: raise InvalidEventException( - self.logical_id, "The only valid values for 'Type' are 'SQS', 'SNS', and 'S3'" + self.logical_id, "The only valid values for 'Type' are 'SQS', 'SNS', 'S3', and 'Kafka'" ) if destination_type == "SQS": queue_arn = on_failure.get("Destination") @@ -225,6 +225,9 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P destination_config_policy = IAMRolePolicies().s3_send_event_payload_role_policy( s3_arn, self.logical_id ) + elif destination_type == "Kafka": + # No policy generation for Kafka destinations - pass through + pass lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig diff --git a/samtranslator/schema/schema.json b/samtranslator/schema/schema.json index 330a678439..3df8ee7b6b 100644 --- a/samtranslator/schema/schema.json +++ b/samtranslator/schema/schema.json @@ -275783,6 +275783,15 @@ "MSKEventProperties": { "additionalProperties": false, "properties": { + "BisectBatchOnFunctionError": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "If the function returns an error, split the batch in two and retry\\. \n*Type*: Boolean \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`BisectBatchOnFunctionError`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-bisectbatchonfunctionerror) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "BisectBatchOnFunctionError" + }, "ConsumerGroupId": { "allOf": [ { @@ -275807,6 +275816,15 @@ "markdownDescription": "A object that defines the criteria that determines whether Lambda should process an event\\. For more information, see [AWS Lambda event filtering](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) in the *AWS Lambda Developer Guide*\\. \n*Type*: [FilterCriteria](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-filtercriteria.html) \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FilterCriteria`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-filtercriteria.html) property of an `AWS::Lambda::EventSourceMapping` resource\\.", "title": "FilterCriteria" }, + "FunctionResponseTypes": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "A list of the response types currently applied to the event source mapping\\. For more information, see [Reporting batch item failures](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting) in the *AWS Lambda Developer Guide*\\. \n*Valid values*: `ReportBatchItemFailures` \n*Type*: List \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FunctionResponseTypes`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-functionresponsetypes) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "FunctionResponseTypes" + }, "KmsKeyArn": { "$ref": "#/definitions/PassThroughProp" }, @@ -275819,6 +275837,24 @@ "markdownDescription": "The maximum amount of time to gather records before invoking the function, in seconds\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumBatchingWindowInSeconds`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumbatchingwindowinseconds) property of an `AWS::Lambda::EventSourceMapping` resource\\.", "title": "MaximumBatchingWindowInSeconds" }, + "MaximumRecordAgeInSeconds": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "The maximum age of a record that Lambda sends to a function for processing\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRecordAgeInSeconds`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "MaximumRecordAgeInSeconds" + }, + "MaximumRetryAttempts": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "The maximum number of times to retry when the function returns an error\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRetryAttempts`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumretryattempts) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "MaximumRetryAttempts" + }, "ProvisionedPollerConfig": { "$ref": "#/definitions/PassThroughProp" }, @@ -276622,6 +276658,15 @@ "markdownDescription": "The maximum number of records in each batch that Lambda pulls from your stream and sends to your function\\. \n*Type*: Integer \n*Required*: No \n*Default*: 100 \n*AWS CloudFormation compatibility*: This property is passed directly to the [`BatchSize`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-batchsize) property of an `AWS::Lambda::EventSourceMapping` resource\\. \n*Minimum*: `1` \n*Maximum*: `10000`", "title": "BatchSize" }, + "BisectBatchOnFunctionError": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "If the function returns an error, split the batch in two and retry\\. \n*Type*: Boolean \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`BisectBatchOnFunctionError`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-bisectbatchonfunctionerror) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "BisectBatchOnFunctionError" + }, "ConsumerGroupId": { "allOf": [ { @@ -276649,6 +276694,15 @@ "markdownDescription": "A object that defines the criteria to determine whether Lambda should process an event\\. For more information, see [AWS Lambda event filtering](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) in the *AWS Lambda Developer Guide*\\. \n*Type*: [FilterCriteria](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-filtercriteria.html) \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FilterCriteria`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-filtercriteria.html) property of an `AWS::Lambda::EventSourceMapping` resource\\.", "title": "FilterCriteria" }, + "FunctionResponseTypes": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "A list of the response types currently applied to the event source mapping\\. For more information, see [Reporting batch item failures](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting) in the *AWS Lambda Developer Guide*\\. \n*Valid values*: `ReportBatchItemFailures` \n*Type*: List \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FunctionResponseTypes`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-functionresponsetypes) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "FunctionResponseTypes" + }, "KafkaBootstrapServers": { "items": { "anyOf": [ @@ -276667,6 +276721,24 @@ "KmsKeyArn": { "$ref": "#/definitions/PassThroughProp" }, + "MaximumRecordAgeInSeconds": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "The maximum age of a record that Lambda sends to a function for processing\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRecordAgeInSeconds`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "MaximumRecordAgeInSeconds" + }, + "MaximumRetryAttempts": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "The maximum number of times to retry when the function returns an error\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRetryAttempts`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumretryattempts) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "MaximumRetryAttempts" + }, "ProvisionedPollerConfig": { "$ref": "#/definitions/PassThroughProp" }, diff --git a/schema_source/sam.schema.json b/schema_source/sam.schema.json index 4f2a396080..e366c0c3a8 100644 --- a/schema_source/sam.schema.json +++ b/schema_source/sam.schema.json @@ -2402,6 +2402,15 @@ "MSKEventProperties": { "additionalProperties": false, "properties": { + "BisectBatchOnFunctionError": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "If the function returns an error, split the batch in two and retry\\. \n*Type*: Boolean \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`BisectBatchOnFunctionError`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-bisectbatchonfunctionerror) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "BisectBatchOnFunctionError" + }, "ConsumerGroupId": { "allOf": [ { @@ -2426,6 +2435,15 @@ "markdownDescription": "A object that defines the criteria that determines whether Lambda should process an event\\. For more information, see [AWS Lambda event filtering](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) in the *AWS Lambda Developer Guide*\\. \n*Type*: [FilterCriteria](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-filtercriteria.html) \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FilterCriteria`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-filtercriteria.html) property of an `AWS::Lambda::EventSourceMapping` resource\\.", "title": "FilterCriteria" }, + "FunctionResponseTypes": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "A list of the response types currently applied to the event source mapping\\. For more information, see [Reporting batch item failures](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting) in the *AWS Lambda Developer Guide*\\. \n*Valid values*: `ReportBatchItemFailures` \n*Type*: List \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FunctionResponseTypes`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-functionresponsetypes) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "FunctionResponseTypes" + }, "KmsKeyArn": { "$ref": "#/definitions/PassThroughProp" }, @@ -2438,6 +2456,24 @@ "markdownDescription": "The maximum amount of time to gather records before invoking the function, in seconds\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumBatchingWindowInSeconds`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumbatchingwindowinseconds) property of an `AWS::Lambda::EventSourceMapping` resource\\.", "title": "MaximumBatchingWindowInSeconds" }, + "MaximumRecordAgeInSeconds": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "The maximum age of a record that Lambda sends to a function for processing\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRecordAgeInSeconds`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "MaximumRecordAgeInSeconds" + }, + "MaximumRetryAttempts": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "The maximum number of times to retry when the function returns an error\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRetryAttempts`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumretryattempts) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "MaximumRetryAttempts" + }, "ProvisionedPollerConfig": { "$ref": "#/definitions/PassThroughProp" }, @@ -3172,6 +3208,15 @@ "markdownDescription": "The maximum number of records in each batch that Lambda pulls from your stream and sends to your function\\. \n*Type*: Integer \n*Required*: No \n*Default*: 100 \n*AWS CloudFormation compatibility*: This property is passed directly to the [`BatchSize`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-batchsize) property of an `AWS::Lambda::EventSourceMapping` resource\\. \n*Minimum*: `1` \n*Maximum*: `10000`", "title": "BatchSize" }, + "BisectBatchOnFunctionError": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "If the function returns an error, split the batch in two and retry\\. \n*Type*: Boolean \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`BisectBatchOnFunctionError`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-bisectbatchonfunctionerror) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "BisectBatchOnFunctionError" + }, "ConsumerGroupId": { "allOf": [ { @@ -3199,6 +3244,15 @@ "markdownDescription": "A object that defines the criteria to determine whether Lambda should process an event\\. For more information, see [AWS Lambda event filtering](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) in the *AWS Lambda Developer Guide*\\. \n*Type*: [FilterCriteria](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-filtercriteria.html) \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FilterCriteria`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-filtercriteria.html) property of an `AWS::Lambda::EventSourceMapping` resource\\.", "title": "FilterCriteria" }, + "FunctionResponseTypes": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "A list of the response types currently applied to the event source mapping\\. For more information, see [Reporting batch item failures](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting) in the *AWS Lambda Developer Guide*\\. \n*Valid values*: `ReportBatchItemFailures` \n*Type*: List \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FunctionResponseTypes`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-functionresponsetypes) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "FunctionResponseTypes" + }, "KafkaBootstrapServers": { "items": { "anyOf": [ @@ -3217,6 +3271,24 @@ "KmsKeyArn": { "$ref": "#/definitions/PassThroughProp" }, + "MaximumRecordAgeInSeconds": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "The maximum age of a record that Lambda sends to a function for processing\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRecordAgeInSeconds`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "MaximumRecordAgeInSeconds" + }, + "MaximumRetryAttempts": { + "allOf": [ + { + "$ref": "#/definitions/PassThroughProp" + } + ], + "markdownDescription": "The maximum number of times to retry when the function returns an error\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRetryAttempts`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumretryattempts) property of an `AWS::Lambda::EventSourceMapping` resource\\.", + "title": "MaximumRetryAttempts" + }, "ProvisionedPollerConfig": { "$ref": "#/definitions/PassThroughProp" }, diff --git a/tests/model/eventsources/test_msk_event_source.py b/tests/model/eventsources/test_msk_event_source.py index 87467eaccd..21850dffd1 100644 --- a/tests/model/eventsources/test_msk_event_source.py +++ b/tests/model/eventsources/test_msk_event_source.py @@ -1,7 +1,9 @@ from unittest import TestCase +from unittest.mock import Mock from samtranslator.intrinsics.resolver import IntrinsicsResolver from samtranslator.model.eventsources.pull import MSK +from samtranslator.model.lambda_ import LambdaFunction class MSKEventSource(TestCase): @@ -184,3 +186,155 @@ def test_validate_schema_registry_config_when_access_config_is_empty(self): with self.assertRaises(Exception) as context: self.kafka_event_source.validate_schema_registry_config() self.assertTrue("AccessConfigs in SchemaRegistryConfig must be a list" in str(context.exception)) + + def test_to_cloudformation_with_kafka_destination_type_acceptance(self): + """Test that MSK accepts 'Kafka' as a valid destination type and no additional IAM policies are generated.""" + # Set up MSK event source with Kafka destination + self.kafka_event_source.DestinationConfig = { + "OnFailure": {"Type": "Kafka", "Destination": "kafka://failure-topic"} + } + + # Mock function + mock_function = Mock() + mock_function.get_runtime_attr.return_value = "test-function" + mock_function.get_passthrough_resource_attributes.return_value = {} + + # Mock role + mock_role = Mock() + mock_role.ManagedPolicyArns = [] + mock_role.Policies = None + + # Call to_cloudformation + resources = self.kafka_event_source.to_cloudformation(function=mock_function, role=mock_role) + + # Verify that the method completes without raising an exception + self.assertIsNotNone(resources) + self.assertEqual(len(resources), 1) + + # Verify that the EventSourceMapping resource is created + event_source_mapping = resources[0] + self.assertEqual(event_source_mapping.resource_type, "AWS::Lambda::EventSourceMapping") + + # Verify that DestinationConfig is set correctly (without the Type field) + expected_destination_config = {"OnFailure": {"Destination": "kafka://failure-topic"}} + self.assertEqual(event_source_mapping.DestinationConfig, expected_destination_config) + + # Verify that no additional IAM policies were added for Kafka destination + # The role should only have the base MSK execution role policy + expected_base_policy = "arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + self.assertIn(expected_base_policy, mock_role.ManagedPolicyArns) + + # Verify no destination-specific policies were added + if mock_role.Policies: + # Check that no SQS, SNS, or S3 policies were added + for policy in mock_role.Policies: + policy_doc = policy.get("PolicyDocument", {}) + statements = policy_doc.get("Statement", []) + for statement in statements: + actions = statement.get("Action", []) + # Ensure no destination-specific actions are present + destination_actions = ["sqs:SendMessage", "sns:Publish", "s3:PutObject"] + for action in actions: + self.assertNotIn(action, destination_actions) + + def test_to_cloudformation_with_kafka_destination_config_pass_through(self): + """Test that DestinationConfig is passed through unchanged and kafka:// URI is preserved.""" + # Set up MSK event source with Kafka destination using kafka:// URI + kafka_uri = "kafka://my-kafka-cluster:9092/failure-topic" + self.kafka_event_source.DestinationConfig = {"OnFailure": {"Type": "Kafka", "Destination": kafka_uri}} + + # Mock function + mock_function = Mock() + mock_function.get_runtime_attr.return_value = "test-function" + mock_function.get_passthrough_resource_attributes.return_value = {} + + # Mock role + mock_role = Mock() + mock_role.ManagedPolicyArns = [] + mock_role.Policies = None + + # Call to_cloudformation + resources = self.kafka_event_source.to_cloudformation(function=mock_function, role=mock_role) + + # Verify that the EventSourceMapping resource is created + self.assertEqual(len(resources), 1) + event_source_mapping = resources[0] + + # Verify that DestinationConfig is passed through with the kafka:// URI preserved + # The Type field should be removed, but Destination should remain unchanged + expected_destination_config = {"OnFailure": {"Destination": kafka_uri}} + self.assertEqual(event_source_mapping.DestinationConfig, expected_destination_config) + + # Verify that the original kafka:// URI is preserved exactly as provided + actual_destination = event_source_mapping.DestinationConfig["OnFailure"]["Destination"] + self.assertEqual(actual_destination, kafka_uri) + + # Verify that the URI format is not modified or validated + self.assertTrue(actual_destination.startswith("kafka://")) + self.assertIn("my-kafka-cluster:9092/failure-topic", actual_destination) + + def test_to_cloudformation_with_all_error_handling_properties(self): + self.kafka_event_source.Stream = "arn:aws:kafka:us-east-1:123456789012:cluster/test/abc" + self.kafka_event_source.StartingPosition = "LATEST" + self.kafka_event_source.Topics = ["test-topic"] + self.kafka_event_source.BisectBatchOnFunctionError = True + self.kafka_event_source.MaximumRecordAgeInSeconds = 3600 + self.kafka_event_source.MaximumRetryAttempts = 3 + self.kafka_event_source.FunctionResponseTypes = ["ReportBatchItemFailures"] + + mock_function = LambdaFunction("TestFunction") + resources = self.kafka_event_source.to_cloudformation(function=mock_function) + + self.assertEqual(len(resources), 1) + event_source_mapping = resources[0] + self.assertEqual(event_source_mapping.BisectBatchOnFunctionError, True) + self.assertEqual(event_source_mapping.MaximumRecordAgeInSeconds, 3600) + self.assertEqual(event_source_mapping.MaximumRetryAttempts, 3) + self.assertEqual(event_source_mapping.FunctionResponseTypes, ["ReportBatchItemFailures"]) + + def test_to_cloudformation_without_error_handling_properties(self): + self.kafka_event_source.Stream = "arn:aws:kafka:us-east-1:123456789012:cluster/test/abc" + self.kafka_event_source.StartingPosition = "LATEST" + self.kafka_event_source.Topics = ["test-topic"] + + mock_function = LambdaFunction("TestFunction") + resources = self.kafka_event_source.to_cloudformation(function=mock_function) + + self.assertEqual(len(resources), 1) + event_source_mapping = resources[0] + self.assertIsNone(event_source_mapping.BisectBatchOnFunctionError) + self.assertIsNone(event_source_mapping.MaximumRecordAgeInSeconds) + self.assertIsNone(event_source_mapping.MaximumRetryAttempts) + self.assertIsNone(event_source_mapping.FunctionResponseTypes) + + def test_to_cloudformation_with_provisioned_poller_config_including_poller_group_name(self): + """Test that PollerGroupName is correctly passed through in ProvisionedPollerConfig""" + + # Set up the MSK event source with ProvisionedPollerConfig including PollerGroupName + self.kafka_event_source.Stream = "arn:aws:kafka:us-west-2:012345678901:cluster/mycluster/abc123" + self.kafka_event_source.StartingPosition = "LATEST" + self.kafka_event_source.Topics = ["MyTopic"] + self.kafka_event_source.ProvisionedPollerConfig = { + "MinimumPollers": 5, + "MaximumPollers": 10, + "PollerGroupName": "my-poller-group", + } + + # Create a mock function + function = Mock() + function.get_runtime_attr = Mock() + function.get_runtime_attr.return_value = "arn:aws:lambda:mock" + function.resource_attributes = {} + function.get_passthrough_resource_attributes = Mock() + function.get_passthrough_resource_attributes.return_value = {} + + # Convert to CloudFormation + resources = self.kafka_event_source.to_cloudformation(function=function) + + # Verify that the EventSourceMapping resource is created + self.assertEqual(len(resources), 1) + event_source_mapping = resources[0] + + # Verify that ProvisionedPollerConfig is correctly set with all properties including PollerGroupName + expected_config = {"MinimumPollers": 5, "MaximumPollers": 10, "PollerGroupName": "my-poller-group"} + self.assertEqual(event_source_mapping.ProvisionedPollerConfig, expected_config) diff --git a/tests/model/eventsources/test_self_managed_kafka_event_source.py b/tests/model/eventsources/test_self_managed_kafka_event_source.py index 4a2dc6f15e..4c00653fb8 100644 --- a/tests/model/eventsources/test_self_managed_kafka_event_source.py +++ b/tests/model/eventsources/test_self_managed_kafka_event_source.py @@ -1,9 +1,11 @@ from unittest import TestCase +from unittest.mock import Mock from parameterized import parameterized from samtranslator.intrinsics.resolver import IntrinsicsResolver from samtranslator.model.eventsources.pull import SelfManagedKafka from samtranslator.model.exceptions import InvalidEventException +from samtranslator.model.lambda_ import LambdaFunction class SelfManagedKafkaEventSource(TestCase): @@ -473,3 +475,237 @@ def test_get_policy_statements_with_non_glue_schema_registry(self): self.assertEqual(len(policy_statements[0]["PolicyDocument"]["Statement"]), len(expected_statements)) for statement in expected_statements: self.assertIn(statement, policy_statements[0]["PolicyDocument"]["Statement"]) + + def test_to_cloudformation_with_kafka_destination_type_acceptance(self): + """Test that SelfManagedKafka accepts 'Kafka' as a valid destination type and no additional IAM policies are generated.""" + # Set up SelfManagedKafka event source with Kafka destination + self.kafka_event_source.DestinationConfig = { + "OnFailure": {"Type": "Kafka", "Destination": "kafka://failure-topic"} + } + + # Set up required properties for SelfManagedKafka + self.kafka_event_source.KafkaBootstrapServers = ["localhost:9092"] + self.kafka_event_source.Topics = ["test-topic"] + self.kafka_event_source.SourceAccessConfigurations = [ + {"Type": "BASIC_AUTH", "URI": "SECRET_URI"}, + {"Type": "VPC_SUBNET", "URI": "SECRET_URI"}, + {"Type": "VPC_SECURITY_GROUP", "URI": "SECRET_URI"}, + ] + + # Mock function + mock_function = Mock() + mock_function.get_runtime_attr.return_value = "test-function" + mock_function.get_passthrough_resource_attributes.return_value = {} + + # Mock role + mock_role = Mock() + mock_role.ManagedPolicyArns = [] + mock_role.Policies = None + + # Call to_cloudformation + resources = self.kafka_event_source.to_cloudformation(function=mock_function, role=mock_role) + + # Verify that the method completes without raising an exception + self.assertIsNotNone(resources) + self.assertEqual(len(resources), 1) + + # Verify that the EventSourceMapping resource is created + event_source_mapping = resources[0] + self.assertEqual(event_source_mapping.resource_type, "AWS::Lambda::EventSourceMapping") + + # Verify that DestinationConfig is set correctly (without the Type field) + expected_destination_config = {"OnFailure": {"Destination": "kafka://failure-topic"}} + self.assertEqual(event_source_mapping.DestinationConfig, expected_destination_config) + + # Verify that no additional IAM policies were added for Kafka destination + # SelfManagedKafka doesn't have a base managed policy, so ManagedPolicyArns should remain empty + self.assertEqual(len(mock_role.ManagedPolicyArns), 0) + + # Verify no destination-specific policies were added + if mock_role.Policies: + # Check that no SQS, SNS, or S3 policies were added + for policy in mock_role.Policies: + policy_doc = policy.get("PolicyDocument", {}) + statements = policy_doc.get("Statement", []) + for statement in statements: + actions = statement.get("Action", []) + # Ensure no destination-specific actions are present + destination_actions = ["sqs:SendMessage", "sns:Publish", "s3:PutObject"] + for action in actions: + self.assertNotIn(action, destination_actions) + + def test_to_cloudformation_with_kafka_destination_config_pass_through(self): + """Test that DestinationConfig is passed through unchanged and kafka:// URI is preserved.""" + # Set up SelfManagedKafka event source with Kafka destination using kafka:// URI + kafka_uri = "kafka://my-kafka-cluster:9092/failure-topic" + self.kafka_event_source.DestinationConfig = {"OnFailure": {"Type": "Kafka", "Destination": kafka_uri}} + + # Set up required properties for SelfManagedKafka + self.kafka_event_source.KafkaBootstrapServers = ["localhost:9092"] + self.kafka_event_source.Topics = ["test-topic"] + self.kafka_event_source.SourceAccessConfigurations = [ + {"Type": "BASIC_AUTH", "URI": "SECRET_URI"}, + {"Type": "VPC_SUBNET", "URI": "SECRET_URI"}, + {"Type": "VPC_SECURITY_GROUP", "URI": "SECRET_URI"}, + ] + + # Mock function + mock_function = Mock() + mock_function.get_runtime_attr.return_value = "test-function" + mock_function.get_passthrough_resource_attributes.return_value = {} + + # Mock role + mock_role = Mock() + mock_role.ManagedPolicyArns = [] + mock_role.Policies = None + + # Call to_cloudformation + resources = self.kafka_event_source.to_cloudformation(function=mock_function, role=mock_role) + + # Verify that the EventSourceMapping resource is created + self.assertEqual(len(resources), 1) + event_source_mapping = resources[0] + + # Verify that DestinationConfig is passed through with the kafka:// URI preserved + # The Type field should be removed, but Destination should remain unchanged + expected_destination_config = {"OnFailure": {"Destination": kafka_uri}} + self.assertEqual(event_source_mapping.DestinationConfig, expected_destination_config) + + # Verify that the original kafka:// URI is preserved exactly as provided + actual_destination = event_source_mapping.DestinationConfig["OnFailure"]["Destination"] + self.assertEqual(actual_destination, kafka_uri) + + # Verify that the URI format is not modified or validated + self.assertTrue(actual_destination.startswith("kafka://")) + self.assertIn("my-kafka-cluster:9092/failure-topic", actual_destination) + + def test_to_cloudformation_with_bisect_batch_on_function_error(self): + # Test with True value + self.kafka_event_source.KafkaBootstrapServers = ["endpoint1", "endpoint2"] + self.kafka_event_source.Topics = ["test-topic"] + self.kafka_event_source.SourceAccessConfigurations = [{"Type": "BASIC_AUTH", "URI": "SECRET_URI"}] + self.kafka_event_source.BisectBatchOnFunctionError = True + + mock_function = LambdaFunction("TestFunction") + resources = self.kafka_event_source.to_cloudformation(function=mock_function) + + self.assertEqual(len(resources), 1) + event_source_mapping = resources[0] + self.assertEqual(event_source_mapping.BisectBatchOnFunctionError, True) + + # Test with False value + self.kafka_event_source.BisectBatchOnFunctionError = False + resources = self.kafka_event_source.to_cloudformation(function=mock_function) + + self.assertEqual(len(resources), 1) + event_source_mapping = resources[0] + self.assertEqual(event_source_mapping.BisectBatchOnFunctionError, False) + + def test_to_cloudformation_with_max_record_age_in_seconds(self): + self.kafka_event_source.KafkaBootstrapServers = ["endpoint1", "endpoint2"] + self.kafka_event_source.Topics = ["test-topic"] + self.kafka_event_source.SourceAccessConfigurations = [{"Type": "BASIC_AUTH", "URI": "SECRET_URI"}] + self.kafka_event_source.MaximumRecordAgeInSeconds = 3600 + + mock_function = LambdaFunction("TestFunction") + resources = self.kafka_event_source.to_cloudformation(function=mock_function) + + self.assertEqual(len(resources), 1) + event_source_mapping = resources[0] + self.assertEqual(event_source_mapping.MaximumRecordAgeInSeconds, 3600) + + def test_to_cloudformation_with_max_retry_attempts(self): + self.kafka_event_source.KafkaBootstrapServers = ["endpoint1", "endpoint2"] + self.kafka_event_source.Topics = ["test-topic"] + self.kafka_event_source.SourceAccessConfigurations = [{"Type": "BASIC_AUTH", "URI": "SECRET_URI"}] + self.kafka_event_source.MaximumRetryAttempts = 3 + + mock_function = LambdaFunction("TestFunction") + resources = self.kafka_event_source.to_cloudformation(function=mock_function) + + self.assertEqual(len(resources), 1) + event_source_mapping = resources[0] + self.assertEqual(event_source_mapping.MaximumRetryAttempts, 3) + + def test_to_cloudformation_with_function_response_types(self): + self.kafka_event_source.KafkaBootstrapServers = ["endpoint1", "endpoint2"] + self.kafka_event_source.Topics = ["test-topic"] + self.kafka_event_source.SourceAccessConfigurations = [{"Type": "BASIC_AUTH", "URI": "SECRET_URI"}] + self.kafka_event_source.FunctionResponseTypes = ["ReportBatchItemFailures"] + + mock_function = LambdaFunction("TestFunction") + resources = self.kafka_event_source.to_cloudformation(function=mock_function) + + self.assertEqual(len(resources), 1) + event_source_mapping = resources[0] + self.assertEqual(event_source_mapping.FunctionResponseTypes, ["ReportBatchItemFailures"]) + + def test_to_cloudformation_with_all_error_handling_properties(self): + self.kafka_event_source.KafkaBootstrapServers = ["endpoint1", "endpoint2"] + self.kafka_event_source.Topics = ["test-topic"] + self.kafka_event_source.SourceAccessConfigurations = [{"Type": "BASIC_AUTH", "URI": "SECRET_URI"}] + self.kafka_event_source.BisectBatchOnFunctionError = True + self.kafka_event_source.MaximumRecordAgeInSeconds = 3600 + self.kafka_event_source.MaximumRetryAttempts = 3 + self.kafka_event_source.FunctionResponseTypes = ["ReportBatchItemFailures"] + + mock_function = LambdaFunction("TestFunction") + resources = self.kafka_event_source.to_cloudformation(function=mock_function) + + self.assertEqual(len(resources), 1) + event_source_mapping = resources[0] + self.assertEqual(event_source_mapping.BisectBatchOnFunctionError, True) + self.assertEqual(event_source_mapping.MaximumRecordAgeInSeconds, 3600) + self.assertEqual(event_source_mapping.MaximumRetryAttempts, 3) + self.assertEqual(event_source_mapping.FunctionResponseTypes, ["ReportBatchItemFailures"]) + + def test_to_cloudformation_without_error_handling_properties(self): + self.kafka_event_source.KafkaBootstrapServers = ["endpoint1", "endpoint2"] + self.kafka_event_source.Topics = ["test-topic"] + self.kafka_event_source.SourceAccessConfigurations = [{"Type": "BASIC_AUTH", "URI": "SECRET_URI"}] + + mock_function = LambdaFunction("TestFunction") + resources = self.kafka_event_source.to_cloudformation(function=mock_function) + + self.assertEqual(len(resources), 1) + event_source_mapping = resources[0] + self.assertIsNone(event_source_mapping.BisectBatchOnFunctionError) + self.assertIsNone(event_source_mapping.MaximumRecordAgeInSeconds) + self.assertIsNone(event_source_mapping.MaximumRetryAttempts) + self.assertIsNone(event_source_mapping.FunctionResponseTypes) + + def test_to_cloudformation_with_provisioned_poller_config_including_poller_group_name(self): + """Test that PollerGroupName is correctly passed through in ProvisionedPollerConfig""" + + # Set up the SelfManagedKafka event source with ProvisionedPollerConfig including PollerGroupName + self.kafka_event_source.KafkaBootstrapServers = ["broker1:9092", "broker2:9092"] + self.kafka_event_source.Topics = ["MyTopic"] + self.kafka_event_source.SourceAccessConfigurations = [ + {"Type": "SASL_SCRAM_256_AUTH", "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-secret"}, + {"Type": "VPC_SUBNET", "URI": "subnet-12345"}, + {"Type": "VPC_SECURITY_GROUP", "URI": "sg-67890"}, + ] + self.kafka_event_source.ProvisionedPollerConfig = { + "MinimumPollers": 3, + "MaximumPollers": 15, + "PollerGroupName": "self-managed-poller-group", + } + + # Create a mock function + function = Mock() + function.get_runtime_attr = Mock() + function.get_runtime_attr.return_value = "arn:aws:lambda:mock" + function.resource_attributes = {} + function.get_passthrough_resource_attributes = Mock() + function.get_passthrough_resource_attributes.return_value = {} + + # Convert to CloudFormation + resources = self.kafka_event_source.to_cloudformation(function=function) + + # Verify that the EventSourceMapping resource is created + self.assertEqual(len(resources), 1) + event_source_mapping = resources[0] + + # Verify that ProvisionedPollerConfig is correctly set with all properties including PollerGroupName + expected_config = {"MinimumPollers": 3, "MaximumPollers": 15, "PollerGroupName": "self-managed-poller-group"} + self.assertEqual(event_source_mapping.ProvisionedPollerConfig, expected_config) diff --git a/tests/translator/input/function_with_provisioned_poller_config.yaml b/tests/translator/input/function_with_provisioned_poller_config.yaml index 0b8b92f3c7..73e95a8e78 100644 --- a/tests/translator/input/function_with_provisioned_poller_config.yaml +++ b/tests/translator/input/function_with_provisioned_poller_config.yaml @@ -17,7 +17,44 @@ Resources: ProvisionedPollerConfig: MinimumPollers: 5 MaximumPollers: 10 - MyKafkaCluster: + PollerGroupName: !Sub "${AWS::StackName}-msk-poller-group" + MyMskEventWithKafkaDestination: + Type: MSK + Properties: + StartingPosition: LATEST + Stream: !Sub arn:aws:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2 + Topics: + - MyDummyTestTopic + ConsumerGroupId: consumergroup1 + DestinationConfig: + OnFailure: + Type: Kafka + Destination: kafka://topic2 + ProvisionedPollerConfig: + MinimumPollers: 5 + MaximumPollers: 10 + MyKafkaEvent: + Type: SelfManagedKafka + Properties: + KafkaBootstrapServers: + - abc.xyz.com:9092 + - 123.45.67.89:9096 + Topics: + - Topic1 + SourceAccessConfigurations: + - Type: SASL_SCRAM_512_AUTH + URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c + - Type: VPC_SUBNET + URI: subnet:subnet-12345 + - Type: VPC_SECURITY_GROUP + URI: security_group:sg-67890 + ConsumerGroupId: consumergroup1 + StartingPosition: AT_TIMESTAMP + StartingPositionTimestamp: 1672560000 + ProvisionedPollerConfig: + MinimumPollers: 2 + MaximumPollers: 100 + MyKafkaEventWithKafkaDestination: Type: SelfManagedKafka Properties: KafkaBootstrapServers: @@ -38,3 +75,4 @@ Resources: ProvisionedPollerConfig: MinimumPollers: 2 MaximumPollers: 100 + PollerGroupName: self-managed-kafka-poller-group diff --git a/tests/translator/output/aws-cn/function_with_provisioned_poller_config.json b/tests/translator/output/aws-cn/function_with_provisioned_poller_config.json index f89bf28e9c..797d954363 100644 --- a/tests/translator/output/aws-cn/function_with_provisioned_poller_config.json +++ b/tests/translator/output/aws-cn/function_with_provisioned_poller_config.json @@ -23,7 +23,7 @@ }, "Type": "AWS::Lambda::Function" }, - "KafkaEventsFunctionMyKafkaCluster": { + "KafkaEventsFunctionMyKafkaEvent": { "Properties": { "FunctionName": { "Ref": "KafkaEventsFunction" @@ -65,6 +65,49 @@ }, "Type": "AWS::Lambda::EventSourceMapping" }, + "KafkaEventsFunctionMyKafkaEventWithKafkaDestination": { + "Properties": { + "FunctionName": { + "Ref": "KafkaEventsFunction" + }, + "ProvisionedPollerConfig": { + "MaximumPollers": 100, + "MinimumPollers": 2, + "PollerGroupName": "self-managed-kafka-poller-group" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "abc.xyz.com:9092", + "123.45.67.89:9096" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1" + }, + "SourceAccessConfigurations": [ + { + "Type": "SASL_SCRAM_512_AUTH", + "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Type": "VPC_SUBNET", + "URI": "subnet:subnet-12345" + }, + { + "Type": "VPC_SECURITY_GROUP", + "URI": "security_group:sg-67890" + } + ], + "StartingPosition": "AT_TIMESTAMP", + "StartingPositionTimestamp": 1672560000, + "Topics": [ + "Topic1" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, "KafkaEventsFunctionMyMskEvent": { "Properties": { "AmazonManagedKafkaEventSourceConfig": { @@ -76,6 +119,36 @@ "FunctionName": { "Ref": "KafkaEventsFunction" }, + "ProvisionedPollerConfig": { + "MaximumPollers": 10, + "MinimumPollers": 5, + "PollerGroupName": { + "Fn::Sub": "${AWS::StackName}-msk-poller-group" + } + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "KafkaEventsFunctionMyMskEventWithKafkaDestination": { + "Properties": { + "AmazonManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1" + }, + "DestinationConfig": { + "OnFailure": { + "Destination": "kafka://topic2" + } + }, + "EventSourceArn": { + "Fn::Sub": "arn:aws:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "KafkaEventsFunction" + }, "ProvisionedPollerConfig": { "MaximumPollers": 10, "MinimumPollers": 5 diff --git a/tests/translator/output/aws-us-gov/function_with_provisioned_poller_config.json b/tests/translator/output/aws-us-gov/function_with_provisioned_poller_config.json index 367f70f6f1..7b31295491 100644 --- a/tests/translator/output/aws-us-gov/function_with_provisioned_poller_config.json +++ b/tests/translator/output/aws-us-gov/function_with_provisioned_poller_config.json @@ -23,7 +23,7 @@ }, "Type": "AWS::Lambda::Function" }, - "KafkaEventsFunctionMyKafkaCluster": { + "KafkaEventsFunctionMyKafkaEvent": { "Properties": { "FunctionName": { "Ref": "KafkaEventsFunction" @@ -65,6 +65,49 @@ }, "Type": "AWS::Lambda::EventSourceMapping" }, + "KafkaEventsFunctionMyKafkaEventWithKafkaDestination": { + "Properties": { + "FunctionName": { + "Ref": "KafkaEventsFunction" + }, + "ProvisionedPollerConfig": { + "MaximumPollers": 100, + "MinimumPollers": 2, + "PollerGroupName": "self-managed-kafka-poller-group" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "abc.xyz.com:9092", + "123.45.67.89:9096" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1" + }, + "SourceAccessConfigurations": [ + { + "Type": "SASL_SCRAM_512_AUTH", + "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Type": "VPC_SUBNET", + "URI": "subnet:subnet-12345" + }, + { + "Type": "VPC_SECURITY_GROUP", + "URI": "security_group:sg-67890" + } + ], + "StartingPosition": "AT_TIMESTAMP", + "StartingPositionTimestamp": 1672560000, + "Topics": [ + "Topic1" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, "KafkaEventsFunctionMyMskEvent": { "Properties": { "AmazonManagedKafkaEventSourceConfig": { @@ -76,6 +119,36 @@ "FunctionName": { "Ref": "KafkaEventsFunction" }, + "ProvisionedPollerConfig": { + "MaximumPollers": 10, + "MinimumPollers": 5, + "PollerGroupName": { + "Fn::Sub": "${AWS::StackName}-msk-poller-group" + } + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "KafkaEventsFunctionMyMskEventWithKafkaDestination": { + "Properties": { + "AmazonManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1" + }, + "DestinationConfig": { + "OnFailure": { + "Destination": "kafka://topic2" + } + }, + "EventSourceArn": { + "Fn::Sub": "arn:aws:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "KafkaEventsFunction" + }, "ProvisionedPollerConfig": { "MaximumPollers": 10, "MinimumPollers": 5 diff --git a/tests/translator/output/error_function_with_invalid_stream_eventsource_dest_type.json b/tests/translator/output/error_function_with_invalid_stream_eventsource_dest_type.json index 562cb37442..49b797c2b8 100644 --- a/tests/translator/output/error_function_with_invalid_stream_eventsource_dest_type.json +++ b/tests/translator/output/error_function_with_invalid_stream_eventsource_dest_type.json @@ -4,12 +4,12 @@ "Number of errors found: 1. ", "Resource with id [MyFunction] is invalid. ", "Event with id [MyFunctionStreamEvent] is invalid. ", - "The only valid values for 'Type' are 'SQS', 'SNS', and 'S3'" + "The only valid values for 'Type' are 'SQS', 'SNS', 'S3', and 'Kafka'" ], - "errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. The only valid values for 'Type' are 'SQS', 'SNS', and 'S3'", + "errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. The only valid values for 'Type' are 'SQS', 'SNS', 'S3', and 'Kafka'", "errors": [ { - "errorMessage": "Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. The only valid values for 'Type' are 'SQS', 'SNS', and 'S3'" + "errorMessage": "Resource with id [MyFunction] is invalid. Event with id [MyFunctionStreamEvent] is invalid. The only valid values for 'Type' are 'SQS', 'SNS', 'S3', and 'Kafka'" } ] } diff --git a/tests/translator/output/function_with_provisioned_poller_config.json b/tests/translator/output/function_with_provisioned_poller_config.json index 35b8cd51e6..adfcae551b 100644 --- a/tests/translator/output/function_with_provisioned_poller_config.json +++ b/tests/translator/output/function_with_provisioned_poller_config.json @@ -23,7 +23,7 @@ }, "Type": "AWS::Lambda::Function" }, - "KafkaEventsFunctionMyKafkaCluster": { + "KafkaEventsFunctionMyKafkaEvent": { "Properties": { "FunctionName": { "Ref": "KafkaEventsFunction" @@ -65,6 +65,49 @@ }, "Type": "AWS::Lambda::EventSourceMapping" }, + "KafkaEventsFunctionMyKafkaEventWithKafkaDestination": { + "Properties": { + "FunctionName": { + "Ref": "KafkaEventsFunction" + }, + "ProvisionedPollerConfig": { + "MaximumPollers": 100, + "MinimumPollers": 2, + "PollerGroupName": "self-managed-kafka-poller-group" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "abc.xyz.com:9092", + "123.45.67.89:9096" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1" + }, + "SourceAccessConfigurations": [ + { + "Type": "SASL_SCRAM_512_AUTH", + "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Type": "VPC_SUBNET", + "URI": "subnet:subnet-12345" + }, + { + "Type": "VPC_SECURITY_GROUP", + "URI": "security_group:sg-67890" + } + ], + "StartingPosition": "AT_TIMESTAMP", + "StartingPositionTimestamp": 1672560000, + "Topics": [ + "Topic1" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, "KafkaEventsFunctionMyMskEvent": { "Properties": { "AmazonManagedKafkaEventSourceConfig": { @@ -76,6 +119,36 @@ "FunctionName": { "Ref": "KafkaEventsFunction" }, + "ProvisionedPollerConfig": { + "MaximumPollers": 10, + "MinimumPollers": 5, + "PollerGroupName": { + "Fn::Sub": "${AWS::StackName}-msk-poller-group" + } + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "KafkaEventsFunctionMyMskEventWithKafkaDestination": { + "Properties": { + "AmazonManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1" + }, + "DestinationConfig": { + "OnFailure": { + "Destination": "kafka://topic2" + } + }, + "EventSourceArn": { + "Fn::Sub": "arn:aws:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "KafkaEventsFunction" + }, "ProvisionedPollerConfig": { "MaximumPollers": 10, "MinimumPollers": 5