Skip to content

Commit edaf657

Browse files
Merge branch 'develop' into refactor(parser)-s3-parser-fields
2 parents 65ab4b0 + bf5a80f commit edaf657

File tree

2 files changed

+116
-23
lines changed

2 files changed

+116
-23
lines changed

aws_lambda_powertools/utilities/parser/models/kafka.py

Lines changed: 92 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,76 @@
11
from datetime import datetime
22
from typing import Dict, List, Literal, Optional, Type, Union
33

4-
from pydantic import BaseModel, field_validator
4+
from pydantic import BaseModel, Field, field_validator
55

66
from aws_lambda_powertools.shared.functions import base64_decode, bytes_to_string, decode_header_bytes
77

88
SERVERS_DELIMITER = ","
99

1010

1111
class KafkaRecordSchemaMetadata(BaseModel):
12-
dataFormat: str
13-
schemaId: str
12+
dataFormat: str = Field(
13+
description="The data format of the schema (e.g., AVRO, JSON).",
14+
examples=["AVRO", "JSON", "PROTOBUF"],
15+
)
16+
schemaId: str = Field(
17+
description="The unique identifier of the schema.",
18+
examples=["1234", "5678", "schema-abc-123"],
19+
)
1420

1521

1622
class KafkaRecordModel(BaseModel):
17-
topic: str
18-
partition: int
19-
offset: int
20-
timestamp: datetime
21-
timestampType: str
22-
key: Optional[bytes] = None
23-
value: Union[str, Type[BaseModel]]
24-
headers: List[Dict[str, bytes]]
25-
keySchemaMetadata: Optional[KafkaRecordSchemaMetadata] = None
26-
valueSchemaMetadata: Optional[KafkaRecordSchemaMetadata] = None
23+
topic: str = Field(
24+
description="The Kafka topic name from which the record originated.",
25+
examples=["mytopic", "user-events", "order-processing", "mymessage-with-unsigned"],
26+
)
27+
partition: int = Field(
28+
description="The partition number within the topic from which the record was consumed.",
29+
examples=[0, 1, 5, 10],
30+
)
31+
offset: int = Field(
32+
description="The offset of the record within the partition.",
33+
examples=[15, 123, 456789, 1000000],
34+
)
35+
timestamp: datetime = Field(
36+
description="The timestamp of the record.",
37+
examples=[1545084650987, 1640995200000, 1672531200000],
38+
)
39+
timestampType: str = Field(
40+
description="The type of timestamp (CREATE_TIME or LOG_APPEND_TIME).",
41+
examples=["CREATE_TIME", "LOG_APPEND_TIME"],
42+
)
43+
key: Optional[bytes] = Field(
44+
default=None,
45+
description="The message key, base64-encoded. Can be null for messages without keys.",
46+
examples=["cmVjb3JkS2V5", "dXNlci0xMjM=", "b3JkZXItNDU2", None],
47+
)
48+
value: Union[str, Type[BaseModel]] = Field(
49+
description="The message value, base64-encoded.",
50+
examples=[
51+
"eyJrZXkiOiJ2YWx1ZSJ9",
52+
"eyJtZXNzYWdlIjogIkhlbGxvIEthZmthIn0=",
53+
"eyJ1c2VyX2lkIjogMTIzLCAiYWN0aW9uIjogImxvZ2luIn0=",
54+
],
55+
)
56+
headers: List[Dict[str, bytes]] = Field(
57+
description="A list of message headers as key-value pairs with byte array values.",
58+
examples=[
59+
[{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}],
60+
[{"contentType": [97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110]}],
61+
[],
62+
],
63+
)
64+
keySchemaMetadata: Optional[KafkaRecordSchemaMetadata] = Field(
65+
default=None,
66+
description="Schema metadata for the message key when using schema registry.",
67+
examples=[{"dataFormat": "AVRO", "schemaId": "1234"}, None],
68+
)
69+
valueSchemaMetadata: Optional[KafkaRecordSchemaMetadata] = Field(
70+
default=None,
71+
description="Schema metadata for the message value when using schema registry.",
72+
examples=[{"dataFormat": "AVRO", "schemaId": "1234"}, None],
73+
)
2774

2875
# key is optional; only decode if not None
2976
@field_validator("key", mode="before")
@@ -44,8 +91,23 @@ def decode_headers_list(cls, value):
4491

4592

4693
class KafkaBaseEventModel(BaseModel):
47-
bootstrapServers: List[str]
48-
records: Dict[str, List[KafkaRecordModel]]
94+
bootstrapServers: List[str] = Field(
95+
description="A list of Kafka bootstrap servers (broker endpoints).",
96+
examples=[
97+
["b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092"],
98+
[
99+
"b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
100+
"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
101+
],
102+
],
103+
)
104+
records: Dict[str, List[KafkaRecordModel]] = Field(
105+
description="A dictionary mapping topic-partition combinations to lists of Kafka records.",
106+
examples=[
107+
{"mytopic-0": [{"topic": "mytopic", "partition": 0, "offset": 15}]},
108+
{"user-events-1": [{"topic": "user-events", "partition": 1, "offset": 123}]},
109+
],
110+
)
49111

50112
@field_validator("bootstrapServers", mode="before")
51113
def split_servers(cls, value):
@@ -59,7 +121,10 @@ class KafkaSelfManagedEventModel(KafkaBaseEventModel):
59121
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
60122
"""
61123

62-
eventSource: Literal["SelfManagedKafka"]
124+
eventSource: Literal["SelfManagedKafka"] = Field(
125+
description="The event source identifier for self-managed Kafka.",
126+
examples=["SelfManagedKafka"],
127+
)
63128

64129

65130
class KafkaMskEventModel(KafkaBaseEventModel):
@@ -69,5 +134,14 @@ class KafkaMskEventModel(KafkaBaseEventModel):
69134
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
70135
"""
71136

72-
eventSource: Literal["aws:kafka"]
73-
eventSourceArn: str
137+
eventSource: Literal["aws:kafka"] = Field(
138+
description="The AWS service that invoked the function.",
139+
examples=["aws:kafka"],
140+
)
141+
eventSourceArn: str = Field(
142+
description="The Amazon Resource Name (ARN) of the MSK cluster.",
143+
examples=[
144+
"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
145+
"arn:aws:kafka:eu-central-1:123456789012:cluster/MyCluster/xyz789-1234-5678-90ab-cdef12345678-2",
146+
],
147+
)

aws_lambda_powertools/utilities/parser/models/transfer_family.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,27 @@
55

66

77
class TransferFamilyAuthorizer(BaseModel):
8-
username: str
9-
password: Optional[str] = None
10-
protocol: Literal["SFTP", "FTP", "FTPS"]
11-
server_id: str = Field(..., alias="serverId")
12-
source_ip: IPvAnyAddress = Field(..., alias="sourceIp")
8+
username: str = Field(
9+
description="The username of the user attempting to authenticate.",
10+
examples=["bobusa", "john.doe", "sftp-user-123", "data-transfer-user"],
11+
)
12+
password: Optional[str] = Field(
13+
default=None,
14+
description="The password for authentication.",
15+
examples=["<password>", "<user-password>", None],
16+
)
17+
protocol: Literal["SFTP", "FTP", "FTPS"] = Field(
18+
description="The protocol used for the connection.",
19+
examples=["SFTP", "FTPS", "FTP"],
20+
)
21+
server_id: str = Field(
22+
...,
23+
alias="serverId",
24+
description="The server ID of the Transfer Family server.",
25+
examples=["s-abcd123456", "s-1234567890abcdef0", "s-example123"],
26+
)
27+
source_ip: IPvAnyAddress = Field(
28+
...,
29+
alias="sourceIp",
30+
description="The IP address of the client connecting to the Transfer Family server.",
31+
)

0 commit comments

Comments
 (0)