Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions integration/combination/test_function_with_msk.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@ def test_function_with_msk_trigger_and_s3_onfailure_events_destinations(self):
"combination/function_with_msk_trigger_and_s3_onfailure_events_destinations", parameters
)

def test_function_with_msk_trigger_and_confluent_schema_registry(self):
def test_function_with_msk_trigger_and_premium_features(self):
companion_stack_outputs = self.companion_stack_outputs
parameters = self.get_parameters(companion_stack_outputs)
cluster_name = "MskCluster4-" + generate_suffix()
parameters.append(self.generate_parameter("MskClusterName4", cluster_name))
self._common_validations_for_MSK(
self._common_validations_for_MSK("combination/function_with_msk_trigger_and_premium_features", parameters)
event_source_mapping_result = self._common_validations_for_MSK(
"combination/function_with_msk_trigger_and_confluent_schema_registry", parameters
)
# Verify error handling properties are correctly set
self.assertTrue(event_source_mapping_result.get("BisectBatchOnFunctionError"))
self.assertEqual(event_source_mapping_result.get("MaximumRecordAgeInSeconds"), 3600)
self.assertEqual(event_source_mapping_result.get("MaximumRetryAttempts"), 3)
self.assertEqual(event_source_mapping_result.get("FunctionResponseTypes"), ["ReportBatchItemFailures"])

def _common_validations_for_MSK(self, file_name, parameters):
self.create_and_verify_stack(file_name, parameters)
Expand All @@ -74,6 +80,7 @@ def _common_validations_for_MSK(self, file_name, parameters):

self.assertEqual(event_source_mapping_function_arn, lambda_function_arn)
self.assertEqual(event_source_mapping_kafka_cluster_arn, msk_cluster_arn)
return event_source_mapping_result

def get_parameters(self, dictionary):
parameters = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class TestFunctionWithSelfManagedKafka(BaseTest):
@pytest.mark.flaky(reruns=5)
@parameterized.expand(
[
"combination/function_with_self_managed_kafka",
"combination/function_with_self_managed_kafka_intrinsics",
]
)
Expand All @@ -30,3 +29,29 @@ def test_function_with_self_managed_kafka(self, file_name):
event_source_mapping_result = lambda_client.get_event_source_mapping(UUID=event_source_mapping_id)
event_source_mapping_function_arn = event_source_mapping_result["FunctionArn"]
self.assertEqual(event_source_mapping_function_arn, lambda_function_arn)

@parameterized.expand(["combination/function_with_self_managed_kafka"])
def test_function_with_self_managed_kafka_with_provisioned_mode(self, file_name):
self.create_and_verify_stack(file_name)
# Get the notification configuration and make sure Lambda Function connection is added
lambda_client = self.client_provider.lambda_client
function_name = self.get_physical_id_by_type("AWS::Lambda::Function")
lambda_function_arn = lambda_client.get_function_configuration(FunctionName=function_name)["FunctionArn"]
event_source_mapping_id = self.get_physical_id_by_type("AWS::Lambda::EventSourceMapping")
event_source_mapping_result = lambda_client.get_event_source_mapping(UUID=event_source_mapping_id)
event_source_mapping_function_arn = event_source_mapping_result["FunctionArn"]
self.assertEqual(event_source_mapping_function_arn, lambda_function_arn)

# Verify error handling properties are correctly set
self.assertTrue(event_source_mapping_result.get("BisectBatchOnFunctionError"))
self.assertEqual(event_source_mapping_result.get("MaximumRecordAgeInSeconds"), 3600)
self.assertEqual(event_source_mapping_result.get("MaximumRetryAttempts"), 3)
self.assertEqual(event_source_mapping_result.get("FunctionResponseTypes"), ["ReportBatchItemFailures"])
# Uncomment this once SDK is updated.
# provisioned_poller_config = event_source_mapping_result["ProvisionedPollerConfig"]
# actual_poller_group_name = provisioned_poller_config["PollerGroupName"]
# self.assertEqual(
# actual_poller_group_name,
# "test1",
# f"Expected PollerGroupName to be 'test1' but got '{actual_poller_group_name}'",
# )
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,17 @@ Resources:
Ref: MyMskCluster
Topics:
- SchemaRegistryTestTopic
DestinationConfig:
OnFailure:
Type: Kafka
Destination: kafka://testTopic
ProvisionedPollerConfig:
MinimumPollers: 1
BisectBatchOnFunctionError: true
MaximumRecordAgeInSeconds: 3600
MaximumRetryAttempts: 3
FunctionResponseTypes:
- ReportBatchItemFailures
SchemaRegistryConfig:
AccessConfigs:
- Type: BASIC_AUTH
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@ Resources:
- 123.45.67.89:9096
Topics:
- Topic1
ProvisionedPollerConfig:
MinimumPollers: 1
PollerGroupName: test1
SourceAccessConfigurations:
- Type: BASIC_AUTH
URI:
Ref: KafkaUserSecret
BisectBatchOnFunctionError: true
MaximumRecordAgeInSeconds: 3600
MaximumRetryAttempts: 3
FunctionResponseTypes:
- ReportBatchItemFailures

KafkaUserSecret:
Type: AWS::SecretsManager::Secret
Expand Down
2 changes: 1 addition & 1 deletion samtranslator/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.102.0"
__version__ = "1.103.0"
10 changes: 10 additions & 0 deletions samtranslator/internal/schema_source/aws_serverless_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ class MSKEventProperties(BaseModel):
DestinationConfig: Optional[PassThroughProp] # TODO: add documentation
ProvisionedPollerConfig: Optional[PassThroughProp]
SchemaRegistryConfig: Optional[PassThroughProp]
BisectBatchOnFunctionError: Optional[PassThroughProp] = mskeventproperties("BisectBatchOnFunctionError")
FunctionResponseTypes: Optional[PassThroughProp] = mskeventproperties("FunctionResponseTypes")
MaximumRecordAgeInSeconds: Optional[PassThroughProp] = mskeventproperties("MaximumRecordAgeInSeconds")
MaximumRetryAttempts: Optional[PassThroughProp] = mskeventproperties("MaximumRetryAttempts")


class MSKEvent(BaseModel):
Expand Down Expand Up @@ -463,6 +467,12 @@ class SelfManagedKafkaEventProperties(BaseModel):
Topics: PassThroughProp = selfmanagedkafkaeventproperties("Topics")
ProvisionedPollerConfig: Optional[PassThroughProp]
SchemaRegistryConfig: Optional[PassThroughProp]
BisectBatchOnFunctionError: Optional[PassThroughProp] = selfmanagedkafkaeventproperties(
"BisectBatchOnFunctionError"
)
MaximumRecordAgeInSeconds: Optional[PassThroughProp] = selfmanagedkafkaeventproperties("MaximumRecordAgeInSeconds")
MaximumRetryAttempts: Optional[PassThroughProp] = selfmanagedkafkaeventproperties("MaximumRetryAttempts")
FunctionResponseTypes: Optional[PassThroughProp] = selfmanagedkafkaeventproperties("FunctionResponseTypes")


class SelfManagedKafkaEvent(BaseModel):
Expand Down
12 changes: 10 additions & 2 deletions samtranslator/internal/schema_source/sam-docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@
"StartingPosition": "The position in a stream from which to start reading\\. \n+ `AT_TIMESTAMP` \u2013 Specify a time from which to start reading records\\.\n+ `LATEST` \u2013 Read only new records\\.\n+ `TRIM_HORIZON` \u2013 Process all available records\\.\n*Valid values*: `AT_TIMESTAMP` \\| `LATEST` \\| `TRIM_HORIZON` \n*Type*: String \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPosition`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingposition) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
"StartingPositionTimestamp": "The time from which to start reading, in Unix time seconds\\. Define `StartingPositionTimestamp` when `StartingPosition` is specified as `AT_TIMESTAMP`\\. \n*Type*: Double \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPositionTimestamp`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingpositiontimestamp) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
"Stream": "The Amazon Resource Name \\(ARN\\) of the data stream or a stream consumer\\. \n*Type*: String \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`EventSourceArn`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-eventsourcearn) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
"Topics": "The name of the Kafka topic\\. \n*Type*: List \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`Topics`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics) property of an `AWS::Lambda::EventSourceMapping` resource\\."
"Topics": "The name of the Kafka topic\\. \n*Type*: List \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`Topics`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
"BisectBatchOnFunctionError": "If the function returns an error, split the batch in two and retry\\. \n*Type*: Boolean \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`BisectBatchOnFunctionError`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-bisectbatchonfunctionerror) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
"FunctionResponseTypes": "A list of the response types currently applied to the event source mapping\\. For more information, see [Reporting batch item failures](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting) in the *AWS Lambda Developer Guide*\\. \n*Valid values*: `ReportBatchItemFailures` \n*Type*: List \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FunctionResponseTypes`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-functionresponsetypes) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
"MaximumRecordAgeInSeconds": "The maximum age of a record that Lambda sends to a function for processing\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRecordAgeInSeconds`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
"MaximumRetryAttempts": "The maximum number of times to retry when the function returns an error\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRetryAttempts`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumretryattempts) property of an `AWS::Lambda::EventSourceMapping` resource\\."
},
"sam-property-function-onfailure": {
"Destination": "The Amazon Resource Name \\(ARN\\) of the destination resource\\. \n*Type*: String \n*Required*: Conditional \n*AWS CloudFormation compatibility*: This property is similar to the [`OnFailure`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventinvokeconfig-destinationconfig-onfailure.html#cfn-lambda-eventinvokeconfig-destinationconfig-onfailure-destination) property of an `AWS::Lambda::EventInvokeConfig` resource\\. SAM will add any necessary permissions to the auto\\-generated IAM Role associated with this function to access the resource referenced in this property\\. \n*Additional notes*: If the type is Lambda/EventBridge, Destination is required\\.",
Expand Down Expand Up @@ -379,7 +383,11 @@
"SourceAccessConfigurations": "An array of the authentication protocol, VPC components, or virtual host to secure and define your event source\\. \n*Valid values*: `BASIC_AUTH | CLIENT_CERTIFICATE_TLS_AUTH | SASL_SCRAM_256_AUTH | SASL_SCRAM_512_AUTH | SERVER_ROOT_CA_CERTIFICATE` \n*Type*: List of [SourceAccessConfiguration](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration.html) \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the `[ SourceAccessConfigurations](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-sourceaccessconfigurations)` property of an `AWS::Lambda::EventSourceMapping` resource\\.",
"StartingPosition": "The position in a stream from which to start reading\\. \n+ `AT_TIMESTAMP` \u2013 Specify a time from which to start reading records\\.\n+ `LATEST` \u2013 Read only new records\\.\n+ `TRIM_HORIZON` \u2013 Process all available records\\.\n*Valid values*: `AT_TIMESTAMP` \\| `LATEST` \\| `TRIM_HORIZON` \n*Type*: String \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPosition`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingposition) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
"StartingPositionTimestamp": "The time from which to start reading, in Unix time seconds\\. Define `StartingPositionTimestamp` when `StartingPosition` is specified as `AT_TIMESTAMP`\\. \n*Type*: Double \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPositionTimestamp`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingpositiontimestamp) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
"Topics": "The name of the Kafka topic\\. \n*Type*: List \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`Topics`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics) property of an `AWS::Lambda::EventSourceMapping` resource\\."
"Topics": "The name of the Kafka topic\\. \n*Type*: List \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`Topics`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
"BisectBatchOnFunctionError": "If the function returns an error, split the batch in two and retry\\. \n*Type*: Boolean \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`BisectBatchOnFunctionError`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-bisectbatchonfunctionerror) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
"FunctionResponseTypes": "A list of the response types currently applied to the event source mapping\\. For more information, see [Reporting batch item failures](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting) in the *AWS Lambda Developer Guide*\\. \n*Valid values*: `ReportBatchItemFailures` \n*Type*: List \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FunctionResponseTypes`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-functionresponsetypes) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
"MaximumRecordAgeInSeconds": "The maximum age of a record that Lambda sends to a function for processing\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRecordAgeInSeconds`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
"MaximumRetryAttempts": "The maximum number of times to retry when the function returns an error\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRetryAttempts`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumretryattempts) property of an `AWS::Lambda::EventSourceMapping` resource\\."
},
"sam-property-function-sns": {
"FilterPolicy": "The filter policy JSON assigned to the subscription\\. For more information, see [GetSubscriptionAttributes](https://docs.aws.amazon.com/sns/latest/api/API_GetSubscriptionAttributes.html) in the Amazon Simple Notification Service API Reference\\. \n*Type*: [SnsFilterPolicy](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sns-subscription.html#cfn-sns-subscription-filterpolicy) \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FilterPolicy`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sns-subscription.html#cfn-sns-subscription-filterpolicy) property of an `AWS::SNS::Subscription` resource\\.",
Expand Down
9 changes: 6 additions & 3 deletions samtranslator/model/eventsources/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,10 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P
if destination_type:
# delete this field as its used internally for SAM to determine the policy
del on_failure["Type"]
# the values 'SQS', 'SNS', and 'S3' are allowed. No intrinsics are allowed
if destination_type not in ["SQS", "SNS", "S3"]:
# the values 'SQS', 'SNS', 'S3', and 'Kafka' are allowed. No intrinsics are allowed
if destination_type not in ["SQS", "SNS", "S3", "Kafka"]:
raise InvalidEventException(
self.logical_id, "The only valid values for 'Type' are 'SQS', 'SNS', and 'S3'"
self.logical_id, "The only valid values for 'Type' are 'SQS', 'SNS', 'S3', and 'Kafka'"
)
if destination_type == "SQS":
queue_arn = on_failure.get("Destination")
Expand All @@ -225,6 +225,9 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P
destination_config_policy = IAMRolePolicies().s3_send_event_payload_role_policy(
s3_arn, self.logical_id
)
elif destination_type == "Kafka":
# No policy generation for Kafka destinations - pass through
pass

lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig

Expand Down
Loading
Loading