Skip to content

Commit 6a786f0

Browse files
authored
Add support for AT_TIMESTAMP and StartingPositionTimestamp (#2758)
Co-authored-by: Gavin Zhang <[email protected]>
1 parent d74a89c commit 6a786f0

File tree

11 files changed

+594
-17
lines changed

11 files changed

+594
-17
lines changed

integration/combination/test_function_with_kinesis.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from unittest.case import skipIf
2-
2+
from parameterized import parameterized
33
from integration.helpers.base_test import BaseTest
44
from integration.helpers.resource import current_region_does_not_support
55
from integration.config.service_names import KINESIS
@@ -15,18 +15,26 @@ def test_function_with_kinesis_trigger(self):
1515
kinesis_stream = kinesis_client.describe_stream(StreamName=kinesis_id)["StreamDescription"]
1616

1717
lambda_client = self.client_provider.lambda_client
18-
function_name = self.get_physical_id_by_type("AWS::Lambda::Function")
19-
lambda_function_arn = lambda_client.get_function_configuration(FunctionName=function_name)["FunctionArn"]
20-
21-
event_source_mapping_arn = self.get_physical_id_by_type("AWS::Lambda::EventSourceMapping")
22-
event_source_mapping_result = lambda_client.get_event_source_mapping(UUID=event_source_mapping_arn)
23-
event_source_mapping_batch_size = event_source_mapping_result["BatchSize"]
24-
event_source_mapping_function_arn = event_source_mapping_result["FunctionArn"]
25-
event_source_mapping_kinesis_stream_arn = event_source_mapping_result["EventSourceArn"]
26-
27-
self.assertEqual(event_source_mapping_batch_size, 100)
28-
self.assertEqual(event_source_mapping_function_arn, lambda_function_arn)
29-
self.assertEqual(event_source_mapping_kinesis_stream_arn, kinesis_stream["StreamARN"])
18+
for function_name, event_source_mapping_arn in [
19+
(
20+
self.get_physical_id_by_logical_id("MyLambdaFunction"),
21+
self.get_physical_id_by_logical_id("MyLambdaFunctionKinesisStream"),
22+
),
23+
(
24+
self.get_physical_id_by_logical_id("MyLambdaFunction2"),
25+
self.get_physical_id_by_logical_id("MyLambdaFunction2KinesisStream"),
26+
),
27+
]:
28+
lambda_function_arn = lambda_client.get_function_configuration(FunctionName=function_name)["FunctionArn"]
29+
30+
event_source_mapping_result = lambda_client.get_event_source_mapping(UUID=event_source_mapping_arn)
31+
event_source_mapping_batch_size = event_source_mapping_result["BatchSize"]
32+
event_source_mapping_function_arn = event_source_mapping_result["FunctionArn"]
33+
event_source_mapping_kinesis_stream_arn = event_source_mapping_result["EventSourceArn"]
34+
35+
self.assertEqual(event_source_mapping_batch_size, 100)
36+
self.assertEqual(event_source_mapping_function_arn, lambda_function_arn)
37+
self.assertEqual(event_source_mapping_kinesis_stream_arn, kinesis_stream["StreamARN"])
3038

3139

3240
@skipIf(current_region_does_not_support([KINESIS]), "Kinesis is not supported in this testing region")

integration/resources/expected/combination/function_with_kinesis.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,22 @@
77
"LogicalResourceId": "MyLambdaFunctionRole",
88
"ResourceType": "AWS::IAM::Role"
99
},
10+
{
11+
"LogicalResourceId": "MyLambdaFunction2",
12+
"ResourceType": "AWS::Lambda::Function"
13+
},
14+
{
15+
"LogicalResourceId": "MyLambdaFunction2Role",
16+
"ResourceType": "AWS::IAM::Role"
17+
},
1018
{
1119
"LogicalResourceId": "MyStream",
1220
"ResourceType": "AWS::Kinesis::Stream"
1321
},
22+
{
23+
"LogicalResourceId": "MyLambdaFunction2KinesisStream",
24+
"ResourceType": "AWS::Lambda::EventSourceMapping"
25+
},
1426
{
1527
"LogicalResourceId": "MyLambdaFunctionKinesisStream",
1628
"ResourceType": "AWS::Lambda::EventSourceMapping"

integration/resources/templates/combination/function_with_kinesis.yaml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,29 @@ Resources:
2121
FunctionResponseTypes:
2222
- ReportBatchItemFailures
2323

24+
MyLambdaFunction2:
25+
Type: AWS::Serverless::Function
26+
Properties:
27+
Handler: index.handler
28+
Runtime: nodejs14.x
29+
CodeUri: ${codeuri}
30+
MemorySize: 128
31+
32+
Events:
33+
KinesisStream:
34+
Type: Kinesis
35+
Properties:
36+
Stream:
37+
# Connect with the stream we have created in this template
38+
Fn::GetAtt: [MyStream, Arn]
39+
40+
BatchSize: 100
41+
StartingPosition: AT_TIMESTAMP
42+
StartingPositionTimestamp: 1671489395
43+
TumblingWindowInSeconds: 120
44+
FunctionResponseTypes:
45+
- ReportBatchItemFailures
46+
2447
MyStream:
2548
Type: AWS::Kinesis::Stream
2649
Properties:

samtranslator/model/eventsources/pull.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from typing import Any, Dict, List, Optional
22

33
from samtranslator.metrics.method_decorator import cw_timer
4-
from samtranslator.model import ResourceMacro, PropertyType
4+
from samtranslator.model import ResourceMacro, PropertyType, PassThroughProperty
55
from samtranslator.model.eventsources import FUNCTION_EVETSOURCE_METRIC_PREFIX
66
from samtranslator.model.types import IS_DICT, is_type, IS_STR
7+
from samtranslator.schema.common import PassThrough
78
from samtranslator.model.intrinsics import is_intrinsic
89

910
from samtranslator.model.lambda_ import LambdaEventSourceMapping
@@ -37,7 +38,8 @@ class PullEventSource(ResourceMacro):
3738
"Stream": PropertyType(False, IS_STR),
3839
"Queue": PropertyType(False, IS_STR),
3940
"BatchSize": PropertyType(False, is_type(int)),
40-
"StartingPosition": PropertyType(False, IS_STR),
41+
"StartingPosition": PassThroughProperty(False),
42+
"StartingPositionTimestamp": PassThroughProperty(False),
4143
"Enabled": PropertyType(False, is_type(bool)),
4244
"MaximumBatchingWindowInSeconds": PropertyType(False, is_type(int)),
4345
"MaximumRetryAttempts": PropertyType(False, is_type(int)),
@@ -60,7 +62,8 @@ class PullEventSource(ResourceMacro):
6062
Stream: Optional[Intrinsicable[str]]
6163
Queue: Optional[Intrinsicable[str]]
6264
BatchSize: Optional[Intrinsicable[int]]
63-
StartingPosition: Optional[Intrinsicable[str]]
65+
StartingPosition: Optional[PassThrough]
66+
StartingPositionTimestamp: Optional[PassThrough]
6467
Enabled: Optional[bool]
6568
MaximumBatchingWindowInSeconds: Optional[Intrinsicable[int]]
6669
MaximumRetryAttempts: Optional[Intrinsicable[int]]
@@ -124,6 +127,7 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
124127
lambda_eventsourcemapping.FunctionName = function_name_or_arn
125128
lambda_eventsourcemapping.EventSourceArn = self.Stream or self.Queue or self.Broker
126129
lambda_eventsourcemapping.StartingPosition = self.StartingPosition
130+
lambda_eventsourcemapping.StartingPositionTimestamp = self.StartingPositionTimestamp
127131
lambda_eventsourcemapping.BatchSize = self.BatchSize
128132
lambda_eventsourcemapping.Enabled = self.Enabled
129133
lambda_eventsourcemapping.MaximumBatchingWindowInSeconds = self.MaximumBatchingWindowInSeconds

samtranslator/model/lambda_.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from typing import Optional, Dict, Any, List, Union
2-
from samtranslator.model import PropertyType, Resource
2+
from samtranslator.model import PropertyType, Resource, PassThroughProperty
33
from samtranslator.model.types import IS_DICT, is_type, one_of, IS_STR, list_of, any_type
44
from samtranslator.model.intrinsics import fnGetAtt, ref
55
from samtranslator.utils.types import Intrinsicable
@@ -101,6 +101,7 @@ class LambdaEventSourceMapping(Resource):
101101
"DestinationConfig": PropertyType(False, IS_DICT),
102102
"ParallelizationFactor": PropertyType(False, is_type(int)),
103103
"StartingPosition": PropertyType(False, IS_STR),
104+
"StartingPositionTimestamp": PassThroughProperty(False),
104105
"Topics": PropertyType(False, is_type(list)),
105106
"Queues": PropertyType(False, is_type(list)),
106107
"SourceAccessConfigurations": PropertyType(False, is_type(list)),

samtranslator/schema/aws_serverless_function.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ class KinesisEventProperties(BaseModel):
157157
MaximumRetryAttempts: Optional[PassThrough] = kinesiseventproperties("MaximumRetryAttempts")
158158
ParallelizationFactor: Optional[PassThrough] = kinesiseventproperties("ParallelizationFactor")
159159
StartingPosition: PassThrough = kinesiseventproperties("StartingPosition")
160+
StartingPositionTimestamp: PassThrough # TODO: add documentation
160161
Stream: PassThrough = kinesiseventproperties("Stream")
161162
TumblingWindowInSeconds: Optional[PassThrough] = kinesiseventproperties("TumblingWindowInSeconds")
162163

@@ -178,6 +179,7 @@ class DynamoDBEventProperties(BaseModel):
178179
MaximumRetryAttempts: Optional[PassThrough] = dynamodbeventproperties("MaximumRetryAttempts")
179180
ParallelizationFactor: Optional[PassThrough] = dynamodbeventproperties("ParallelizationFactor")
180181
StartingPosition: PassThrough = dynamodbeventproperties("StartingPosition")
182+
StartingPositionTimestamp: PassThrough # TODO: add documentation
181183
Stream: PassThrough = dynamodbeventproperties("Stream")
182184
TumblingWindowInSeconds: Optional[PassThrough] = dynamodbeventproperties("TumblingWindowInSeconds")
183185

@@ -353,6 +355,7 @@ class MSKEventProperties(BaseModel):
353355
FilterCriteria: Optional[PassThrough] = mskeventproperties("FilterCriteria")
354356
MaximumBatchingWindowInSeconds: Optional[PassThrough] = mskeventproperties("MaximumBatchingWindowInSeconds")
355357
StartingPosition: PassThrough = mskeventproperties("StartingPosition")
358+
StartingPositionTimestamp: PassThrough # TODO: add documentation
356359
Stream: PassThrough = mskeventproperties("Stream")
357360
Topics: PassThrough = mskeventproperties("Topics")
358361

samtranslator/schema/schema.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2118,6 +2118,9 @@
21182118
"description": "The position in a stream from which to start reading\\. \n*Valid values*: `TRIM_HORIZON` or `LATEST` \n*Type*: String \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPosition`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingposition) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
21192119
"markdownDescription": "The position in a stream from which to start reading\\. \n*Valid values*: `TRIM_HORIZON` or `LATEST` \n*Type*: String \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPosition`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingposition) property of an `AWS::Lambda::EventSourceMapping` resource\\."
21202120
},
2121+
"StartingPositionTimestamp": {
2122+
"title": "Startingpositiontimestamp"
2123+
},
21212124
"Stream": {
21222125
"title": "Stream",
21232126
"description": "The Amazon Resource Name \\(ARN\\) of the data stream or a stream consumer\\. \n*Type*: String \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`EventSourceArn`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-eventsourcearn) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
@@ -2220,6 +2223,9 @@
22202223
"description": "The position in a stream from which to start reading\\. \n*Valid values*: `TRIM_HORIZON` or `LATEST` \n*Type*: String \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPosition`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingposition) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
22212224
"markdownDescription": "The position in a stream from which to start reading\\. \n*Valid values*: `TRIM_HORIZON` or `LATEST` \n*Type*: String \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPosition`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingposition) property of an `AWS::Lambda::EventSourceMapping` resource\\."
22222225
},
2226+
"StartingPositionTimestamp": {
2227+
"title": "Startingpositiontimestamp"
2228+
},
22232229
"Stream": {
22242230
"title": "Stream",
22252231
"description": "The Amazon Resource Name \\(ARN\\) of the DynamoDB stream\\. \n*Type*: String \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`EventSourceArn`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-eventsourcearn) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
@@ -3454,6 +3460,9 @@
34543460
"description": "The position in a stream from which to start reading\\. \n*Valid values*: `TRIM_HORIZON` or `LATEST` \n*Type*: String \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPosition`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingposition) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
34553461
"markdownDescription": "The position in a stream from which to start reading\\. \n*Valid values*: `TRIM_HORIZON` or `LATEST` \n*Type*: String \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPosition`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingposition) property of an `AWS::Lambda::EventSourceMapping` resource\\."
34563462
},
3463+
"StartingPositionTimestamp": {
3464+
"title": "Startingpositiontimestamp"
3465+
},
34573466
"Stream": {
34583467
"title": "Stream",
34593468
"description": "The Amazon Resource Name \\(ARN\\) of the data stream or a stream consumer\\. \n*Type*: String \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`EventSourceArn`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-eventsourcearn) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
Resources:
2+
KinesisTriggerFunction:
3+
Type: AWS::Serverless::Function
4+
Properties:
5+
Timeout: 5
6+
Runtime: nodejs12.x
7+
MemorySize: 128
8+
Tracing: Active
9+
AutoPublishAlias: live
10+
InlineCode: |
11+
exports.handler = async (event, context, callback) => {
12+
return {
13+
statusCode: 200,
14+
body: 'Success'
15+
}
16+
}
17+
Handler: trigger.handler
18+
Description: >
19+
This function triggered when a file is uploaded in a stream (Kinesis)
20+
Events:
21+
Stream:
22+
Type: Kinesis
23+
Properties:
24+
Stream: !GetAtt KinesisStream.Arn
25+
BatchSize: 500
26+
StartingPosition: AT_TIMESTAMP
27+
StartingPositionTimestamp: 1671489395
28+
ParallelizationFactor: 1
29+
MaximumRetryAttempts: 1000
30+
BisectBatchOnFunctionError: true
31+
Policies:
32+
- KinesisStreamReadPolicy:
33+
StreamName: !Ref KinesisStream
34+
35+
KinesisStream:
36+
Type: AWS::Kinesis::Stream
37+
Properties:
38+
Name: KinesisStream
39+
RetentionPeriodHours: 24
40+
ShardCount: 1
41+
StreamEncryption:
42+
EncryptionType: KMS
43+
KeyId: alias/aws/kinesis

0 commit comments

Comments
 (0)