Skip to content

Commit 8096018

Browse files
refactor(parser): Improve Kinesis models with examples and descriptions (#7092)
Improve Kinesis models with examples and descriptions
1 parent b8ae35b commit 8096018

File tree

3 files changed

+156
-40
lines changed

3 files changed

+156
-40
lines changed

aws_lambda_powertools/utilities/parser/models/kinesis.py

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import zlib
33
from typing import Dict, List, Literal, Type, Union
44

5-
from pydantic import BaseModel, field_validator
5+
from pydantic import BaseModel, Field, field_validator
66

77
from aws_lambda_powertools.shared.functions import base64_decode
88
from aws_lambda_powertools.utilities.parser.models.cloudwatch import (
@@ -11,26 +11,61 @@
1111

1212

1313
class KinesisDataStreamRecordPayload(BaseModel):
14-
kinesisSchemaVersion: str
15-
partitionKey: str
16-
sequenceNumber: str
17-
data: Union[bytes, Type[BaseModel], BaseModel] # base64 encoded str is parsed into bytes
18-
approximateArrivalTimestamp: float
14+
kinesisSchemaVersion: str = Field(
15+
description="The version of the Kinesis Data Streams record format.",
16+
examples=["1.0"],
17+
)
18+
partitionKey: str = Field(
19+
description="The partition key that was used to place the record in the stream.",
20+
examples=["user123", "device-001", "order-12345"],
21+
)
22+
sequenceNumber: str = Field(
23+
description="The unique sequence number for the record within the shard.",
24+
examples=[
25+
"49590338271490256608559692538361571095921575989136588898",
26+
"49545115243490985018280067714973144582180062593244200961",
27+
],
28+
)
29+
data: Union[bytes, Type[BaseModel], BaseModel] = Field( # base64 encoded str is parsed into bytes
30+
description="The data payload of the record. Base64 encoded string is automatically decoded to bytes.",
31+
)
32+
approximateArrivalTimestamp: float = Field(
33+
description="The approximate time that the record was inserted into the stream (Unix timestamp).",
34+
examples=[1428537600.0, 1609459200.5],
35+
)
1936

2037
@field_validator("data", mode="before")
2138
def data_base64_decode(cls, value):
2239
return base64_decode(value)
2340

2441

2542
class KinesisDataStreamRecord(BaseModel):
26-
eventSource: Literal["aws:kinesis"]
27-
eventVersion: str
28-
eventID: str
29-
eventName: Literal["aws:kinesis:record"]
30-
invokeIdentityArn: str
31-
awsRegion: str
32-
eventSourceARN: str
33-
kinesis: KinesisDataStreamRecordPayload
43+
eventSource: Literal["aws:kinesis"] = Field(
44+
description="The AWS service that generated the event.",
45+
examples=["aws:kinesis"],
46+
)
47+
eventVersion: str = Field(description="The version of the event schema.", examples=["1.0"])
48+
eventID: str = Field(
49+
description="A unique identifier for the event.",
50+
examples=["shardId-000000000006:49590338271490256608559692538361571095921575989136588898"],
51+
)
52+
eventName: Literal["aws:kinesis:record"] = Field(
53+
description="The name of the event type.",
54+
examples=["aws:kinesis:record"],
55+
)
56+
invokeIdentityArn: str = Field(
57+
description="The ARN of the IAM role used to invoke the Lambda function.",
58+
examples=["arn:aws:iam::123456789012:role/lambda-kinesis-role"],
59+
)
60+
awsRegion: str = Field(
61+
description="The AWS region where the Kinesis stream is located.",
62+
examples=["us-east-1", "us-west-2", "eu-west-1"],
63+
)
64+
eventSourceARN: str = Field(
65+
description="The ARN of the Kinesis stream that generated the event.",
66+
examples=["arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"],
67+
)
68+
kinesis: KinesisDataStreamRecordPayload = Field(description="The Kinesis-specific data for the record.")
3469

3570
def decompress_zlib_record_data_as_json(self) -> Dict:
3671
"""Decompress Kinesis Record bytes data zlib compressed to JSON"""
@@ -41,7 +76,10 @@ def decompress_zlib_record_data_as_json(self) -> Dict:
4176

4277

4378
class KinesisDataStreamModel(BaseModel):
44-
Records: List[KinesisDataStreamRecord]
79+
Records: List[KinesisDataStreamRecord] = Field(
80+
description="A list of Kinesis Data Stream records that triggered the Lambda function.",
81+
examples=[[]],
82+
)
4583

4684

4785
def extract_cloudwatch_logs_from_event(event: KinesisDataStreamModel) -> List[CloudWatchLogsDecode]:
Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,81 @@
11
from typing import List, Optional, Type, Union
22

3-
from pydantic import BaseModel, PositiveInt, field_validator
3+
from pydantic import BaseModel, Field, PositiveInt, field_validator
44

55
from aws_lambda_powertools.shared.functions import base64_decode
66

77

88
class KinesisFirehoseRecordMetadata(BaseModel):
9-
shardId: str
10-
partitionKey: str
11-
approximateArrivalTimestamp: PositiveInt
12-
sequenceNumber: str
13-
subsequenceNumber: int
9+
shardId: str = Field(
10+
description="The shard ID of the Kinesis stream record.",
11+
examples=["shardId-000000000000", "shardId-000000000001"],
12+
)
13+
partitionKey: str = Field(
14+
description="The partition key of the Kinesis stream record.",
15+
examples=["user123", "device-001", "transaction-456"],
16+
)
17+
approximateArrivalTimestamp: PositiveInt = Field(
18+
description="The approximate time when the record arrived in the Kinesis stream \
19+
(Unix timestamp in milliseconds).",
20+
examples=[1428537600000, 1609459200500],
21+
)
22+
sequenceNumber: str = Field(
23+
description="The sequence number of the Kinesis stream record.",
24+
examples=["49590338271490256608559692538361571095921575989136588898"],
25+
)
26+
subsequenceNumber: int = Field(
27+
description="The subsequence number for records that share the same sequence number.",
28+
examples=[0, 1, 2],
29+
)
1430

1531

1632
class KinesisFirehoseRecord(BaseModel):
17-
data: Union[bytes, Type[BaseModel]] # base64 encoded str is parsed into bytes
18-
recordId: str
19-
approximateArrivalTimestamp: PositiveInt
20-
kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = None
33+
data: Union[bytes, Type[BaseModel]] = Field( # base64 encoded str is parsed into bytes
34+
description="The data payload of the record. Base64 encoded string is automatically decoded to bytes.",
35+
)
36+
recordId: str = Field(
37+
description="A unique identifier for the record within the batch.",
38+
examples=[
39+
"49546986683135544286507457936321625675700192471156785154",
40+
"49546986683135544286507457936321625675700192471156785155",
41+
],
42+
)
43+
approximateArrivalTimestamp: PositiveInt = Field(
44+
description="The approximate time when the record arrived in Kinesis Data Firehose \
45+
(Unix timestamp in milliseconds).",
46+
examples=[1428537600000, 1609459200500],
47+
)
48+
kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = Field(
49+
None,
50+
description="Metadata about the original Kinesis stream record \
51+
(only present when the delivery stream source is a Kinesis stream).",
52+
)
2153

2254
@field_validator("data", mode="before")
2355
def data_base64_decode(cls, value):
2456
return base64_decode(value)
2557

2658

2759
class KinesisFirehoseModel(BaseModel):
28-
invocationId: str
29-
deliveryStreamArn: str
30-
region: str
31-
sourceKinesisStreamArn: Optional[str] = None
32-
records: List[KinesisFirehoseRecord]
60+
invocationId: str = Field(
61+
description="A unique identifier for the Lambda invocation.",
62+
examples=["invocationIdExample", "12345678-1234-1234-1234-123456789012"],
63+
)
64+
deliveryStreamArn: str = Field(
65+
description="The ARN of the Kinesis Data Firehose delivery stream.",
66+
examples=["arn:aws:firehose:us-east-1:123456789012:deliverystream/my-delivery-stream"],
67+
)
68+
region: str = Field(
69+
description="The AWS region where the delivery stream is located.",
70+
examples=["us-east-1", "us-west-2", "eu-west-1"],
71+
)
72+
sourceKinesisStreamArn: Optional[str] = Field(
73+
None,
74+
description="The ARN of the source Kinesis stream \
75+
(only present when the delivery stream source is a Kinesis stream).",
76+
examples=["arn:aws:kinesis:us-east-1:123456789012:stream/my-source-stream"],
77+
)
78+
records: List[KinesisFirehoseRecord] = Field(
79+
description="A list of records to be processed by the Lambda function.",
80+
examples=[[]],
81+
)
Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import json
22
from typing import List, Optional
33

4-
from pydantic import BaseModel, PositiveInt, field_validator
4+
from pydantic import BaseModel, Field, PositiveInt, field_validator
55

66
from aws_lambda_powertools.shared.functions import base64_decode
77
from aws_lambda_powertools.utilities.parser.models import KinesisFirehoseRecordMetadata
@@ -10,10 +10,21 @@
1010

1111

1212
class KinesisFirehoseSqsRecord(BaseModel):
13-
data: SqsRecordModel
14-
recordId: str
15-
approximateArrivalTimestamp: PositiveInt
16-
kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = None
13+
data: SqsRecordModel = Field(description="The SQS record data that was delivered through Kinesis Data Firehose.")
14+
recordId: str = Field(
15+
description="A unique identifier for the record within the batch.",
16+
examples=["49546986683135544286507457936321625675700192471156785154"],
17+
)
18+
approximateArrivalTimestamp: PositiveInt = Field(
19+
description="The approximate time when the record arrived in Kinesis Data Firehose \
20+
(Unix timestamp in milliseconds).",
21+
examples=[1428537600000, 1609459200500],
22+
)
23+
kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = Field(
24+
None,
25+
description="Metadata about the original Kinesis stream record \
26+
(only present when the delivery stream source is a Kinesis stream).",
27+
)
1728

1829
@field_validator("data", mode="before")
1930
def data_base64_decode(cls, value):
@@ -22,8 +33,26 @@ def data_base64_decode(cls, value):
2233

2334

2435
class KinesisFirehoseSqsModel(BaseModel):
25-
invocationId: str
26-
deliveryStreamArn: str
27-
region: str
28-
sourceKinesisStreamArn: Optional[str] = None
29-
records: List[KinesisFirehoseSqsRecord]
36+
invocationId: str = Field(
37+
description="A unique identifier for the Lambda invocation.",
38+
examples=["invocationIdExample", "12345678-1234-1234-1234-123456789012"],
39+
)
40+
deliveryStreamArn: str = Field(
41+
description="The ARN of the Kinesis Data Firehose delivery stream.",
42+
examples=["arn:aws:firehose:us-east-1:123456789012:deliverystream/my-sqs-delivery-stream"],
43+
)
44+
region: str = Field(
45+
description="The AWS region where the delivery stream is located.",
46+
examples=["us-east-1", "us-west-2", "eu-west-1"],
47+
)
48+
sourceKinesisStreamArn: Optional[str] = Field(
49+
None,
50+
description="The ARN of the source Kinesis stream \
51+
(only present when the delivery stream source is a Kinesis stream).",
52+
examples=["arn:aws:kinesis:us-east-1:123456789012:stream/my-source-stream"],
53+
)
54+
records: List[KinesisFirehoseSqsRecord] = Field(
55+
description="A list of SQS records delivered through Kinesis Data Firehose \
56+
to be processed by the Lambda function.",
57+
examples=[[]],
58+
)

0 commit comments

Comments
 (0)